diff --git a/pghoard/basebackup.py b/pghoard/basebackup.py index b600eec3..add0f444 100644 --- a/pghoard/basebackup.py +++ b/pghoard/basebackup.py @@ -16,6 +16,7 @@ import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass +from pathlib import Path from queue import Empty, Queue from tempfile import NamedTemporaryFile from threading import Thread @@ -23,6 +24,7 @@ import psycopg2 +from pghoard.compressor import CompressionEvent from pghoard.rohmu import dates, errors, rohmufile from pghoard.rohmu.compat import suppress @@ -30,12 +32,13 @@ from . import common, version, wal from .basebackup_delta import DeltaBaseBackup from .common import ( - BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass, extract_pghoard_bb_v2_metadata, - replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, + BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, FileType, connection_string_using_pgpass, + extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) from .patchedtarfile import tarfile from .rohmu.delta.common import EMBEDDED_FILE_SIZE +from .transfer import UploadEvent BASEBACKUP_NAME = "pghoard_base_backup" EMPTY_DIRS = [ @@ -164,25 +167,33 @@ def run(self): if self.callback_queue: # post a failure event - self.callback_queue.put({"success": False}) + self.callback_queue.put(CallbackEvent(success=False, exception=ex)) finally: self.running = False - @staticmethod - def get_paths_for_backup(basebackup_path): + def get_backup_path(self) -> Path: + """ + Build a unique backup path + + FIXME: this should look at the object storage, not the local incoming + dir. + """ i = 0 + # FIXME: self.basebackup_path should be a path relative to the + # repo root, which we can then use to derive a path relative to the + # local directory. For now, take care of it here. + basebackup_path = Path(self.basebackup_path) + incoming_basebackup_path = Path(self.basebackup_path + "_incoming") + local_repo_root = basebackup_path.parent + relative_basebackup_dir = basebackup_path.relative_to(local_repo_root) while True: - tsdir = "{}_{}".format(datetime.datetime.utcnow().strftime("%Y-%m-%d_%H-%M"), i) - raw_basebackup = os.path.join(basebackup_path + "_incoming", tsdir) - compressed_basebackup = os.path.join(basebackup_path, tsdir) - # The backup directory names need not to be a sequence, so we lean towards skipping over any - # partial or leftover progress below. Make sure we only return paths if we're able to create the - # raw_basebackup directory. - if not os.path.exists(raw_basebackup) and not os.path.exists(compressed_basebackup): - with suppress(FileExistsError): - os.makedirs(raw_basebackup) - return raw_basebackup, compressed_basebackup + tsfilename = "{}_{}".format(datetime.datetime.utcnow().strftime("%Y-%m-%d_%H-%M"), i) + basebackup_path = relative_basebackup_dir / tsfilename + local_basebackup_path = incoming_basebackup_path / tsfilename + if not local_basebackup_path.exists(): + local_basebackup_path.mkdir(exist_ok=True) + return basebackup_path, local_basebackup_path i += 1 def get_command_line(self, output_name): @@ -241,7 +252,8 @@ def basebackup_compression_pipe(self, proc, basebackup_path): "host": socket.gethostname(), } - with NamedTemporaryFile(prefix=basebackup_path, suffix=".tmp-compress") as output_obj: + # FIXME: more str operations on paths + with NamedTemporaryFile(prefix=str(basebackup_path), suffix=".tmp-compress") as output_obj: def extract_header_func(input_data): # backup_label should always be first in the tar ball @@ -293,16 +305,17 @@ def run_piped_basebackup(self): connection_string, _ = replication_connection_string_and_slot_using_pgpass(self.connection_info) start_wal_segment = wal.get_current_lsn_from_identify_system(connection_string).walfile_name - temp_basebackup_dir, compressed_basebackup = self.get_paths_for_backup(self.basebackup_path) + basebackup_path, local_basebackup_path = self.get_backup_path() command = self.get_command_line("-") self.log.debug("Starting to run: %r", command) proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) setattr(proc, "basebackup_start_time", time.monotonic()) self.pid = proc.pid - self.log.info("Started: %r, running as PID: %r, basebackup_location: %r", command, self.pid, compressed_basebackup) + self.log.info("Started: %r, running as PID: %r, basebackup_location: %r", command, self.pid, basebackup_path) - stream_target = os.path.join(temp_basebackup_dir, "data.tmp") + stream_target = local_basebackup_path / "data.tmp" + stream_target.parent.mkdir(parents=True, exist_ok=True) # catch any os level exceptions such out of disk space, so that the underlying # OS process gets properly cleaned up by check_command_success @@ -317,7 +330,6 @@ def run_piped_basebackup(self): self.metrics.unexpected_exception(e, where="PGBaseBackup") self.check_command_success(proc, stream_target) - os.rename(stream_target, compressed_basebackup) # Since we might not be able to parse the backup label we cheat with the start-wal-segment and # start-time a bit. The start-wal-segment is the segment currently being written before @@ -340,15 +352,18 @@ def run_piped_basebackup(self): }) metadata.update(self.metadata) - self.transfer_queue.put({ - "callback_queue": self.callback_queue, - "file_size": compressed_file_size, - "filetype": "basebackup", - "local_path": compressed_basebackup, - "metadata": metadata, - "site": self.site, - "type": "UPLOAD", - }) + self.transfer_queue.put( + UploadEvent( + file_type=FileType.Basebackup, + backup_site_key=self.site, + file_path=basebackup_path, + callback_queue=self.callback_queue, + file_size=compressed_file_size, + source_data=stream_target, + remove_after_upload=True, + metadata=metadata + ) + ) def parse_backup_label(self, backup_label_data): if isinstance(backup_label_data, str): @@ -369,9 +384,11 @@ def parse_backup_label_in_tar(self, basebackup_path): return self.parse_backup_label(content) def run_basic_basebackup(self): - basebackup_directory, _ = self.get_paths_for_backup(self.basebackup_path) - basebackup_tar_file = os.path.join(basebackup_directory, "base.tar") - command = self.get_command_line(basebackup_directory) + # FIXME: handling of path should be cleaner but keep it like this for + # now. + basebackup_path, local_basebackup_path = self.get_backup_path() + basebackup_tar_file = local_basebackup_path / "base.tar" + command = self.get_command_line(local_basebackup_path) self.log.debug("Starting to run: %r", command) proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -391,19 +408,22 @@ def run_basic_basebackup(self): if proc.poll() is not None: break self.check_command_success(proc, basebackup_tar_file) - start_wal_segment, start_time = self.parse_backup_label_in_tar(basebackup_tar_file) - self.compression_queue.put({ - "callback_queue": self.callback_queue, - "full_path": basebackup_tar_file, - "metadata": { - **self.metadata, - "start-time": start_time, - "start-wal-segment": start_wal_segment, - "active-backup-mode": self.site_config["active_backup_mode"], - }, - "type": "CLOSE_WRITE", - }) + self.compression_queue.put( + CompressionEvent( + callback_queue=self.callback_queue, + file_type=FileType.Basebackup, + file_path=basebackup_path, + source_data=basebackup_tar_file, + backup_site_key=self.site, + metadata={ + **self.metadata, + "start-time": start_time, + "start-wal-segment": start_wal_segment, + "active-backup-mode": self.site_config["active_backup_mode"], + } + ) + ) def get_control_entries_for_tar(self, *, metadata, pg_control, backup_label): mtime = time.time() @@ -551,7 +571,7 @@ def tar_one_file( chunk_path, files_to_backup, callback_queue, - filetype="basebackup_chunk", + file_type=FileType.Basebackup_chunk, extra_metadata=None, delta_stats=None ): @@ -604,19 +624,32 @@ def tar_one_file( } if extra_metadata: metadata.update(extra_metadata) - self.transfer_queue.put({ - "callback_queue": callback_queue, - "file_size": result_size, - "filetype": filetype, - "local_path": chunk_path, - "metadata": metadata, - "site": self.site, - "type": "UPLOAD", - }) + # FIXME: handle the key computation before here + chunk_path = Path(chunk_path) + base_repo = chunk_path.parent.parent.parent + chunk_name = chunk_path.relative_to(base_repo) + if file_type == FileType.Basebackup_chunk: + middle_path = Path("basebackup_chunk") + elif file_type == FileType.Basebackup: + middle_path = Path("basebackup") + chunk_name = chunk_name.name + elif file_type == FileType.Basebackup_delta: + middle_path = Path("basebackup_delta") + chunk_name = chunk_name.name + self.transfer_queue.put( + UploadEvent( + callback_queue=callback_queue, + file_size=result_size, + file_type=file_type, + file_path=middle_path / chunk_name, + source_data=chunk_path, + metadata=metadata, + backup_site_key=self.site, + ) + ) # Get the name of the chunk and the name of the parent directory (ie backup "name") - chunk_name = "/".join(chunk_path.split("/")[-2:]) - return chunk_name, input_size, result_size + return str(chunk_name), input_size, result_size def wait_for_chunk_transfer_to_complete(self, chunk_count, upload_results, chunk_callback_queue, start_time): try: @@ -638,6 +671,7 @@ def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index temp_dir=temp_dir, files_to_backup=one_chunk_files, delta_stats=delta_stats, + file_type=FileType.Basebackup_chunk ) self.log.info( "Queued backup chunk %r for transfer, chunks on disk (including partial): %r, current: %r, total chunks: %r", @@ -729,8 +763,7 @@ def run_local_tar_basebackup(self, delta=False, with_delta_stats=False): if not os.path.isdir(pgdata): raise errors.InvalidConfigurationError("pg_data_directory {!r} does not exist".format(pgdata)) - temp_base_dir, compressed_base = self.get_paths_for_backup(self.basebackup_path) - os.makedirs(compressed_base) + _, compressed_base = self.get_backup_path() data_file_format = "{}/{}.{{0:08d}}.pghoard".format(compressed_base, os.path.basename(compressed_base)).format # Default to 2GB chunks of uncompressed data @@ -809,7 +842,7 @@ def run_local_tar_basebackup(self, delta=False, with_delta_stats=False): compression_data=self.compression_data, get_remote_basebackups_info=self.get_remote_basebackups_info, parallel=self.site_config["basebackup_threads"], - temp_base_dir=temp_base_dir, + temp_base_dir=compressed_base, compressed_base=compressed_base ) total_size_plain, total_size_enc, manifest, total_file_count = delta_backup.run( @@ -837,7 +870,7 @@ def run_local_tar_basebackup(self, delta=False, with_delta_stats=False): # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 # is reserved for special files and metadata and will be generated last. chunk_files = self.create_and_upload_chunks( - chunks, data_file_format, temp_base_dir, delta_stats=delta_stats + chunks, data_file_format, compressed_base, delta_stats=delta_stats ) total_size_plain = sum(item["input_size"] for item in chunk_files) @@ -943,9 +976,9 @@ def run_local_tar_basebackup(self, delta=False, with_delta_stats=False): self.tar_one_file( callback_queue=self.callback_queue, chunk_path=data_file_format(0), # pylint: disable=too-many-format-args - temp_dir=temp_base_dir, + temp_dir=compressed_base, files_to_backup=control_files, - filetype="basebackup", + file_type=FileType.Basebackup, extra_metadata={ **self.metadata, "end-time": backup_end_time, diff --git a/pghoard/basebackup_delta.py b/pghoard/basebackup_delta.py index 04e2c7df..f1ed4dba 100644 --- a/pghoard/basebackup_delta.py +++ b/pghoard/basebackup_delta.py @@ -10,16 +10,18 @@ import uuid from contextlib import suppress from multiprocessing.dummy import Pool +from pathlib import Path from queue import Empty, Queue from tempfile import NamedTemporaryFile from typing import Callable, Dict -from pghoard.common import (BackupFailure, BaseBackupFormat, extract_pghoard_delta_v1_metadata) +from pghoard.common import (BackupFailure, BaseBackupFormat, FileType, extract_pghoard_delta_v1_metadata) from pghoard.rohmu import rohmufile from pghoard.rohmu.dates import now from pghoard.rohmu.delta.common import (BackupManifest, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult) from pghoard.rohmu.delta.snapshot import Snapshotter from pghoard.rohmu.errors import FileNotFoundFromStorageError +from pghoard.transfer import UploadEvent class DeltaBaseBackup: @@ -169,18 +171,19 @@ def update_hash(data): "host": socket.gethostname(), } - self.transfer_queue.put({ - "callback_queue": callback_queue, - "file_size": result_size, - "filetype": "basebackup_delta", - "local_path": chunk_path, - "metadata": metadata, - "site": self.site, - "type": "UPLOAD", - "delta": { - "hexdigest": result_digest, - }, - }) + dest_path = Path("basebackup_delta") / result_digest + + self.transfer_queue.put( + UploadEvent( + callback_queue=callback_queue, + file_size=result_size, + file_type=FileType.Basebackup_delta, + backup_site_key=self.site, + metadata=metadata, + file_path=dest_path, + source_data=chunk_path + ) + ) return input_size, result_size, result_digest, skip_upload diff --git a/pghoard/common.py b/pghoard/common.py index b8ec8825..2e9852c8 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -15,7 +15,11 @@ import tarfile import tempfile import time +from dataclasses import dataclass, field from distutils.version import LooseVersion +from pathlib import Path +from queue import Queue +from typing import Any, Dict, Optional from pghoard import pgutil from pghoard.rohmu import IO_BLOCK_SIZE @@ -30,6 +34,40 @@ def __str__(self): return str(self.value) +@dataclass(frozen=True) +class CallbackEvent: + success: bool + exception: Optional[Exception] = None + opaque: Optional[Any] = None + payload: Optional[Dict[str, str]] = field(default_factory=dict) + + +# Should be changed to Queue[CallbackEvent] once +# we drop older python versions +CallbackQueue = Queue + +QuitEvent = object() + + +@enum.unique +class FileType(StrEnum): + Wal = "xlog" + Basebackup = "basebackup" + Basebackup_chunk = "basebackup_chunk" + Basebackup_delta = "basebackup_delta" + Metadata = "metadata" + Timeline = "timeline" + + +FileTypePrefixes = { + FileType.Wal: Path("xlog/"), + FileType.Timeline: Path("timeline/"), + FileType.Basebackup: Path("basebackup/"), + FileType.Basebackup_chunk: Path("basebackup_chunk/"), + FileType.Basebackup_delta: Path("basebackup_delta/"), +} + + @enum.unique class BaseBackupFormat(StrEnum): v1 = "pghoard-bb-v1" diff --git a/pghoard/compressor.py b/pghoard/compressor.py index 433aa6ba..c5f1dbb9 100644 --- a/pghoard/compressor.py +++ b/pghoard/compressor.py @@ -4,23 +4,81 @@ Copyright (c) 2016 Ohmu Ltd See LICENSE for details """ +import enum import hashlib import logging import math import os import socket import time +from collections import defaultdict +from dataclasses import dataclass from io import BytesIO +from pathlib import Path from queue import Empty, Queue from tempfile import NamedTemporaryFile from threading import Event, Thread -from typing import Dict, Optional, Set +from typing import BinaryIO, Dict, Optional, Set, Union from pghoard import config as pgh_config from pghoard import wal -from pghoard.common import write_json_file +from pghoard.common import (CallbackEvent, CallbackQueue, FileType, FileTypePrefixes, QuitEvent, StrEnum, write_json_file) from pghoard.metrics import Metrics -from pghoard.rohmu import errors, rohmufile +from pghoard.rohmu import rohmufile +from pghoard.transfer import TransferQueue, UploadEvent + + +@enum.unique +class CompressionOperation(StrEnum): + Decompress = "DECOMPRESS" + Compress = "COMPRESS" + + +@dataclass(frozen=True) +class BaseCompressorEvent: + file_type: FileType + file_path: Path + backup_site_key: str + source_data: Union[BinaryIO, Path] + callback_queue: CallbackQueue + metadata: Dict[str, str] + + +@dataclass(frozen=True) +class CompressionEvent(BaseCompressorEvent): + delete_file_after_compression: bool = False + compress_to_memory: bool = False + + @property + def operation(self): + return CompressionOperation.Compress + + +@dataclass(frozen=True) +class DecompressionEvent(BaseCompressorEvent): + + destination_path: Path + + @property + def operation(self): + return CompressionOperation.Decompress + + +# Should be changed to Queue[Union[CompressionEvent, Literal[QuitEvent]] once +# we drop older python versions +CompressionQueue = Queue + + +@dataclass(frozen=True) +class WalFileDeletionEvent: + backup_site_key: str + file_path: Path + file_type: Optional[FileType] = None + + +# Should be changed to Queue[WalFileDeletionEvent] once +# we drop older python versions +WalFileDeletionQueue = Queue class CompressorThread(Thread): @@ -30,8 +88,8 @@ class CompressorThread(Thread): def __init__( self, config_dict: Dict, - compression_queue: Queue, - transfer_queue: Queue, + compression_queue: CompressionQueue, + transfer_queue: TransferQueue, metrics: Metrics, critical_failure_event: Event, wal_file_deletion_queue: Queue, @@ -48,33 +106,19 @@ def __init__( self.critical_failure_event = critical_failure_event self.log.debug("Compressor initialized") - def get_compressed_file_path(self, site, filetype, original_path): - if filetype == "basebackup": - rest, _ = os.path.split(original_path) - rest, backupname = os.path.split(rest) - object_path = os.path.join("basebackup", backupname) - else: - object_path = os.path.join("xlog", os.path.basename(original_path)) - - cfp = os.path.join(self.config["backup_location"], self.config["backup_sites"][site]["prefix"], object_path) - self.log.debug("compressed_file_path for %r is %r", original_path, cfp) - return cfp - - def find_site_for_file(self, filepath): - # Formats like: - # /home/foo/t/default/xlog/000000010000000000000014 - # /home/foo/t/default/basebackup/2015-02-06_3/base.tar - for site in self.config["backup_sites"]: - site_path = os.path.join(self.config["backup_location"], self.config["backup_sites"][site]["prefix"]) - if filepath.startswith(site_path): - return site - raise errors.InvalidConfigurationError("Could not find backup site for {}".format(filepath)) + def get_compressed_file_dir(self, site): + # FIXME: this is shared with pghoard.py and convoluted + prefix = Path(self.config["backup_sites"][site]["prefix"]) + path = Path(self.config["backup_location"]) / prefix + if not path.exists(): + path.mkdir() + return path def compression_algorithm(self): return self.config["compression"]["algorithm"] def run(self): - event: Optional[Dict] = None + event: Optional[CompressionEvent] = None while self.running: if event is None: attempt = 1 @@ -83,33 +127,29 @@ def run(self): except Empty: continue try: - if event["type"] == "QUIT": + if event is QuitEvent: break - if event["type"] == "DECOMPRESSION": + if event.operation == CompressionOperation.Decompress: self.handle_decompression_event(event) - else: - filetype = self.get_event_filetype(event) - if filetype: - self.handle_event(event, filetype) - elif "callback_queue" in event and event["callback_queue"]: + elif event.operation == CompressionOperation.Compress: + file_type = event.file_type + if file_type: + self.handle_event(event) + elif event.callback_queue: self.log.debug("Returning success for unrecognized and ignored event: %r", event) - event["callback_queue"].put({"success": True, "opaque": event.get("opaque")}) + event.callback_queue.put(CallbackEvent(success=True)) event = None except Exception as ex: # pylint: disable=broad-except - if "blob" in event: - log_event = dict(event, blob="<{} bytes>".format(len(event["blob"]))) - else: - log_event = event attempt_message = "" if attempt < self.MAX_FAILED_RETRY_ATTEMPTS: attempt_message = f" (attempt {attempt} of {self.MAX_FAILED_RETRY_ATTEMPTS})" - self.log.exception("Problem handling%s: %r: %s: %s", attempt_message, log_event, ex.__class__.__name__, ex) + self.log.exception("Problem handling%s: %r: %s: %s", attempt_message, event, ex.__class__.__name__, ex) self.metrics.unexpected_exception(ex, where="compressor_run") if attempt >= self.MAX_FAILED_RETRY_ATTEMPTS: # When this happens, execution must be stopped in order to prevent data corruption - if "callback_queue" in event and event["callback_queue"]: - event["callback_queue"].put({"success": False, "exception": ex, "opaque": event.get("opaque")}) + if event.callback_queue: + event.callback_queue.put(CallbackEvent(success=False, exception=ex)) self.running = False self.metrics.unexpected_exception(ex, where="compressor_run_critical") self.critical_failure_event.set() @@ -122,69 +162,52 @@ def run(self): self.log.debug("Quitting Compressor") - def get_event_filetype(self, event): - close_write = event["type"] == "CLOSE_WRITE" - move = event["type"] == "MOVE" and event["src_path"].endswith(".partial") - - if close_write and os.path.basename(event["full_path"]) == "base.tar": - return "basebackup" - elif (move or close_write) and wal.TIMELINE_RE.match(os.path.basename(event["full_path"])): - return "timeline" - # for xlog we get both move and close_write on pg10+ (in that order: the write is from an fsync by name) - # -> for now we pick MOVE because that's compatible with pg9.x, but this leaves us open to a problem with - # in case the compressor is so fast to compress and then unlink the file that the fsync by name does - # not find the file anymore. This ends up in a hickup in pg_receivewal. - # TODO: when we drop pg9.x support, switch to close_write here and in all other places where we generate an xlog/WAL - # compression event: https://github.com/aiven/pghoard/commit/29d2ee76139e8231b40619beea0703237eb6b9cc - elif move and wal.WAL_RE.match(os.path.basename(event["full_path"])): - return "xlog" - - return None - def handle_decompression_event(self, event): - with open(event["local_path"], "wb") as output_obj: + with open(event.destination_path, "wb") as output_obj: rohmufile.read_file( - input_obj=BytesIO(event["blob"]), + input_obj=event.source_data, output_obj=output_obj, - metadata=event.get("metadata"), - key_lookup=pgh_config.key_lookup_for_site(self.config, event["site"]), - log_func=self.log.debug, + metadata=event.metadata, + key_lookup=pgh_config.key_lookup_for_site(self.config, event.backup_site_key), + log_func=self.log.debug ) - if "callback_queue" in event: - event["callback_queue"].put({"success": True, "opaque": event.get("opaque")}) + if event.callback_queue: + event.callback_queue.put(CallbackEvent(success=True)) - def handle_event(self, event, filetype): + def handle_event(self, event): # pylint: disable=redefined-variable-type + file_type = event.file_type rsa_public_key = None - site = event.get("site") - if not site: - site = self.find_site_for_file(event["full_path"]) - + site = event.backup_site_key encryption_key_id = self.config["backup_sites"][site]["encryption_key_id"] if encryption_key_id: rsa_public_key = self.config["backup_sites"][site]["encryption_keys"][encryption_key_id]["public"] - - compressed_blob = None - if event.get("compress_to_memory"): + remove_after_upload = False + if event.compress_to_memory: output_obj = BytesIO() compressed_filepath = None + output_data = output_obj else: - compressed_filepath = self.get_compressed_file_path(site, filetype, event["full_path"]) + remove_after_upload = True + type_prefix = FileTypePrefixes[event.file_type] + compressed_filepath = self.get_compressed_file_dir(site) / type_prefix / event.file_path.name + compressed_filepath.parent.mkdir(exist_ok=True) output_obj = NamedTemporaryFile( dir=os.path.dirname(compressed_filepath), prefix=os.path.basename(compressed_filepath), suffix=".tmp-compress" ) - - input_obj = event.get("input_data") - if not input_obj: - input_obj = open(event["full_path"], "rb") + output_data = compressed_filepath + if not isinstance(event.source_data, BytesIO): + input_obj = open(event.source_data, "rb") + else: + input_obj = event.source_data with output_obj, input_obj: hash_algorithm = self.config["hash_algorithm"] hasher = None - if filetype == "xlog": - wal.verify_wal(wal_name=os.path.basename(event["full_path"]), fileobj=input_obj) + if file_type == FileType.Wal: + wal.verify_wal(wal_name=event.file_path.name, fileobj=input_obj) hasher = hashlib.new(hash_algorithm) original_file_size, compressed_file_size = rohmufile.write_file( @@ -200,9 +223,9 @@ def handle_event(self, event, filetype): if compressed_filepath: os.link(output_obj.name, compressed_filepath) else: - compressed_blob = output_obj.getvalue() + output_data = BytesIO(output_obj.getvalue()) - metadata = event.get("metadata", {}) + metadata = event.metadata metadata.update({ "pg-version": self.config["backup_sites"][site].get("pg_version"), "compression-algorithm": self.config["compression"]["algorithm"], @@ -215,14 +238,16 @@ def handle_event(self, event, filetype): metadata["hash-algorithm"] = hash_algorithm if encryption_key_id: metadata.update({"encryption-key-id": encryption_key_id}) + # FIXME: why do we dump this ? Intermediate state stored on disk cannot + # be relied on. if compressed_filepath: - metadata_path = compressed_filepath + ".metadata" + metadata_path = compressed_filepath.with_name(compressed_filepath.name + ".metadata") write_json_file(metadata_path, metadata) self.set_state_defaults_for_site(site) - self.state[site][filetype]["original_data"] += original_file_size - self.state[site][filetype]["compressed_data"] += compressed_file_size - self.state[site][filetype]["count"] += 1 + self.state[site][str(file_type)]["original_data"] += original_file_size + self.state[site][str(file_type)]["compressed_data"] += compressed_file_size + self.state[site][str(file_type)]["count"] += 1 if original_file_size: size_ratio = compressed_file_size / original_file_size self.metrics.gauge( @@ -231,36 +256,26 @@ def handle_event(self, event, filetype): tags={ "algorithm": self.config["compression"]["algorithm"], "site": site, - "type": filetype, + "type": file_type, } ) - transfer_object = { - "callback_queue": event.get("callback_queue"), - "file_size": compressed_file_size, - "filetype": filetype, - "metadata": metadata, - "opaque": event.get("opaque"), - "site": site, - "type": "UPLOAD", - } - if compressed_filepath: - transfer_object["local_path"] = compressed_filepath - else: - transfer_object["blob"] = compressed_blob - transfer_object["local_path"] = event["full_path"] - - if event.get("delete_file_after_compression", True): - if filetype == "xlog": - delete_request = { - "type": "delete_file", - "site": site, - "local_path": event["full_path"], - } - self.log.info("Adding to Uncompressed WAL file to deletion queue: %s", event["full_path"]) + transfer_object = UploadEvent( + callback_queue=event.callback_queue, + file_size=compressed_file_size, + file_type=event.file_type, + metadata=metadata, + backup_site_key=site, + file_path=event.file_path, + source_data=output_data, + remove_after_upload=remove_after_upload + ) + if event.delete_file_after_compression: + if file_type == FileType.Wal: + delete_request = WalFileDeletionEvent(backup_site_key=site, file_path=event.source_data) + self.log.info("Adding to Uncompressed WAL file to deletion queue: %s", event.source_data) self.wal_file_deletion_queue.put(delete_request) else: - os.unlink(event["full_path"]) - + os.unlink(event.source_data) self.transfer_queue.put(transfer_object) return True @@ -298,7 +313,7 @@ class WALFileDeleterThread(Thread): def __init__( self, config: Dict, - wal_file_deletion_queue: Queue, + wal_file_deletion_queue: WalFileDeletionQueue, metrics: Metrics, ): super().__init__() @@ -307,7 +322,7 @@ def __init__( self.metrics = metrics self.wal_file_deletion_queue = wal_file_deletion_queue self.running = True - self.to_be_deleted_files: Dict[str, Set[str]] = {} + self.to_be_deleted_files: Dict[str, Set[str]] = defaultdict(set) self.log.debug("WALFileDeleter initialized") def run(self): @@ -330,16 +345,15 @@ def run(self): continue try: - if event["type"] == "QUIT": + if event is QuitEvent: break - if event["type"] == "delete_file": - site = event["site"] - local_path = event["local_path"] - if site not in self.to_be_deleted_files: - self.to_be_deleted_files[site] = set() - self.to_be_deleted_files[site].add(local_path) - else: - raise RuntimeError("Received bad event") + site = event.backup_site_key + local_path = event.file_path + if not local_path: + raise ValueError("file_path must not be None") + if not site: + raise ValueError("backup_site_key must not be None") + self.to_be_deleted_files[site].add(local_path) self.deleted_unneeded_files() except Exception as ex: # pylint: disable=broad-except self.log.exception("Problem handling event %r: %s: %s", event, ex.__class__.__name__, ex) @@ -348,7 +362,6 @@ def run(self): time.sleep(0.1) # If we have a problem, just keep running for now, at the worst we accumulate files continue - self.log.debug("Quitting WALFileDeleter") def deleted_unneeded_files(self): diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 46b08984..dc3675b3 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -20,6 +20,7 @@ import time from contextlib import closing from dataclasses import dataclass +from pathlib import Path from queue import Empty, Queue from threading import Event from typing import Dict, Optional @@ -29,32 +30,61 @@ from pghoard import config, logutil, metrics, version, wal from pghoard.basebackup import PGBaseBackup from pghoard.common import ( - BaseBackupFormat, BaseBackupMode, create_alert_file, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_v1_metadata, - get_object_storage_config, replication_connection_string_and_slot_using_pgpass, write_json_file + BaseBackupFormat, BaseBackupMode, CallbackEvent, FileType, FileTypePrefixes, create_alert_file, + extract_pghoard_bb_v2_metadata, extract_pghoard_delta_v1_metadata, get_object_storage_config, + replication_connection_string_and_slot_using_pgpass, write_json_file +) +from pghoard.compressor import ( + CompressionEvent, CompressionQueue, CompressorThread, WALFileDeleterThread, WalFileDeletionQueue ) -from pghoard.compressor import CompressorThread, WALFileDeleterThread from pghoard.receivexlog import PGReceiveXLog from pghoard.rohmu import dates, get_transfer, rohmufile from pghoard.rohmu.compat import suppress from pghoard.rohmu.dates import now as utc_now from pghoard.rohmu.errors import (FileNotFoundFromStorageError, InvalidConfigurationError) from pghoard.rohmu.inotify import InotifyWatcher -from pghoard.transfer import TransferAgent +from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent +from pghoard.walreceiver import WALReceiver from pghoard.webserver import WebServer -# Imported this way because WALReceiver requires an unreleased version of psycopg2 -try: - from pghoard.walreceiver import WALReceiver -except ImportError: - WALReceiver = None - @dataclass class DeltaBaseBackupFailureInfo: - last_failed_time: datetime + last_failed_time: datetime.datetime retries: int = 0 +class InotifyAdapter: + def __init__(self, queue): + self.queue = queue + self.path_to_site = {} + + def adapt_event(self, inotify_event): + full_path = Path(inotify_event["full_path"]) + backup_site_key = self.path_to_site[inotify_event["watched_path"]] + # Ignore .partial files + if full_path.suffix == ".partial": + return None + if full_path.suffix == ".history": + filetype = FileType.Timeline + else: + filetype = FileType.Wal + return CompressionEvent( + file_type=filetype, + file_path=FileTypePrefixes[filetype] / full_path.name, + delete_file_after_compression=True, + backup_site_key=backup_site_key, + source_data=full_path, + callback_queue=None, + metadata={} + ) + + def put(self, item): + adapted_event = self.adapt_event(item) + if adapted_event is not None: + self.queue.put(adapted_event) + + class PGHoard: def __init__(self, config_path): self.metrics: Optional[metrics.Metrics] = None @@ -62,9 +92,9 @@ def __init__(self, config_path): self.log_level = None self.running = True self.config_path = config_path - self.compression_queue = Queue() - self.transfer_queue = Queue() - self.wal_file_deletion_queue = Queue() + self.compression_queue = CompressionQueue() + self.transfer_queue = TransferQueue() + self.wal_file_deletion_queue = WalFileDeletionQueue() self.syslog_handler = None self.basebackups = {} self.basebackups_callbacks = {} @@ -101,8 +131,8 @@ def __init__(self, config_path): signal.signal(signal.SIGTERM, self.handle_exit_signal) self.time_of_last_backup_check = {} self.requested_basebackup_sites = set() - - self.inotify = InotifyWatcher(self.compression_queue) + self.inotify_adapter = InotifyAdapter(self.compression_queue) + self.inotify = InotifyWatcher(self.inotify_adapter) self.webserver = WebServer( self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics ) @@ -125,7 +155,6 @@ def __init__(self, config_path): for _ in range(self.config["transfer"]["thread_count"]): ta = TransferAgent( config=self.config, - compression_queue=self.compression_queue, mp_manager=self.mp_manager, transfer_queue=self.transfer_queue, metrics=self.metrics, @@ -164,7 +193,7 @@ def create_basebackup(self, site, connection_info, basebackup_path, callback_que pg_version_server = self.check_pg_server_version(connection_string, site) if not self.check_pg_versions_ok(site, pg_version_server, "pg_basebackup"): if callback_queue: - callback_queue.put({"success": False}) + callback_queue.put(CallbackEvent(success=True)) return thread = PGBaseBackup( @@ -212,7 +241,16 @@ def receivexlog_listener(self, site, connection_info, wal_directory): if not self.check_pg_versions_ok(site, pg_version_server, "pg_receivexlog"): return - self.inotify.add_watch(wal_directory) + # Depending on the PG version, we must react either to MOVE (pre-PG10) + # or CLOSE_WRITE (PG10+) + if pg_version_server >= 100000: + events = ["IN_CLOSE_WRITE"] + else: + events = ["IN_MOVED_TO", "IN_MOVED_FROM"] + events += ["IN_DELETE_SELF"] + + self.inotify.add_watch(wal_directory, events) + self.inotify_adapter.path_to_site[wal_directory] = site thread = PGReceiveXLog( config=self.config, connection_string=connection_string, @@ -534,14 +572,15 @@ def startup_walk_for_missed_files(self): if not wal.WAL_RE.match(filename) and not wal.TIMELINE_RE.match(filename): self.log.warning("Found invalid file %r from incoming xlog directory", full_path) continue - - compression_event = { - "delete_file_after_compression": True, - "full_path": full_path, - "site": site, - "src_path": "{}.partial", - "type": "MOVE", - } + compression_event = CompressionEvent( + file_type=FileType.Wal, + file_path=FileTypePrefixes[FileType.Wal] / filename, + delete_file_after_compression=True, + backup_site_key=site, + source_data=Path(full_path), + callback_queue=None, + metadata={} + ) self.log.debug("Found: %r when starting up, adding to compression queue", compression_event) self.compression_queue.put(compression_event) @@ -559,14 +598,15 @@ def startup_walk_for_missed_files(self): with open(metadata_path, "r") as fp: metadata = json.load(fp) - transfer_event = { - "file_size": os.path.getsize(full_path), - "filetype": "xlog" if is_xlog else "timeline", - "local_path": full_path, - "metadata": metadata, - "site": site, - "type": "UPLOAD", - } + transfer_event = UploadEvent( + file_type=FileType.Wal, + backup_site_key=site, + file_size=os.path.getsize(full_path), + file_path=FileTypePrefixes[FileType.Wal] / filename, + source_data=Path(full_path), + callback_queue=None, + metadata=metadata + ) self.log.debug("Found: %r when starting up, adding to transfer queue", transfer_event) self.transfer_queue.put(transfer_event) @@ -623,7 +663,7 @@ def handle_site(self, site, site_config): if site in self.basebackups: try: result = self.basebackups_callbacks[site].get(block=False) - if result["success"]: + if result.success: # No matter which mode, if succeeded reset the counter self.delta_backup_failures.pop(site, None) elif site_config["basebackup_mode"] == BaseBackupMode.delta: diff --git a/pghoard/rohmu/delta/snapshot.py b/pghoard/rohmu/delta/snapshot.py index a3e53976..e4a4d8ef 100644 --- a/pghoard/rohmu/delta/snapshot.py +++ b/pghoard/rohmu/delta/snapshot.py @@ -180,17 +180,17 @@ def snapshot(self, *, progress: Optional[Progress] = None, reuse_old_snapshotfil progress.start(3) if self.src_iterate_func: - src_dirs = set() - src_files = set() + src_dirs_set = set() + src_files_set = set() for item in self.src_iterate_func(): path = Path(item) if path.is_file() and not path.is_symlink(): - src_files.add(path.relative_to(self.src)) + src_files_set.add(path.relative_to(self.src)) elif path.is_dir(): - src_dirs.add(path.relative_to(self.src)) + src_dirs_set.add(path.relative_to(self.src)) - src_dirs = sorted(src_dirs | {p.parent for p in src_files}) - src_files = sorted(src_files) + src_dirs = sorted(src_dirs_set | {p.parent for p in src_files_set}) + src_files = sorted(src_files_set) else: src_dirs, src_files = self._list_dirs_and_files(self.src) diff --git a/pghoard/rohmu/inotify.py b/pghoard/rohmu/inotify.py index 12f6957f..5d7b9a35 100644 --- a/pghoard/rohmu/inotify.py +++ b/pghoard/rohmu/inotify.py @@ -65,10 +65,11 @@ def __init__(self, compression_queue): self.timeout = 1.0 self.log.debug("InotifyWatcher initialized") - def add_watch(self, path): + def add_watch(self, path, events=None): mask = 0 - for v in event_types.values(): - mask |= v + events = events or event_types.keys() + for key in events: + mask |= event_types[key] watch = self.libc.inotify_add_watch(self.fd, c_char_p(path.encode("utf8")), c_uint32(mask)) if watch < 0: errno = ctypes.get_errno() @@ -109,6 +110,7 @@ def create_event(self, wd, mask, cookie, name): return decoded_name = name.decode("utf8") + watched_path = self.watch_to_path[wd] full_path = os.path.join(self.watch_to_path[wd], decoded_name) if mask & event_types["IN_CREATE"] > 0: @@ -117,10 +119,11 @@ def create_event(self, wd, mask, cookie, name): elif mask & event_types["IN_CLOSE_WRITE"] > 0: # file was open for writing and was closed self.log_event("IN_CLOSE_WRITE", full_path) - self.compression_queue.put({"type": "CLOSE_WRITE", "full_path": full_path}) + self.compression_queue.put({"type": "CLOSE_WRITE", "full_path": full_path, "watched_path": watched_path}) elif mask & event_types["IN_DELETE"] > 0: self.log_event("IN_DELETE", full_path) - self.compression_queue.put({"type": "DELETE", "full_path": full_path}) + self.compression_queue.put({"type": "DELETE", "full_path": full_path, "watched_path": watched_path}) + elif mask & event_types["IN_DELETE_SELF"] > 0: # the monitored directory was deleted self.log_event("IN_DELETE_SELF", full_path) @@ -134,9 +137,14 @@ def create_event(self, wd, mask, cookie, name): self.log_event("IN_MOVED_TO", full_path) src_path = self.cookies.pop(cookie, None) if src_path: - self.compression_queue.put({"type": "MOVE", "full_path": full_path, "src_path": src_path}) + self.compression_queue.put({ + "type": "MOVE", + "full_path": full_path, + "src_path": src_path, + "watched_path": watched_path + }) else: - self.compression_queue.put({"type": "CREATE", "full_path": full_path}) + self.compression_queue.put({"type": "CREATE", "full_path": full_path, "watched_path": watched_path}) def run(self): self.log.debug("Starting InotifyWatcher") diff --git a/pghoard/rohmu/object_storage/swift.py b/pghoard/rohmu/object_storage/swift.py index 127299fe..30a90535 100644 --- a/pghoard/rohmu/object_storage/swift.py +++ b/pghoard/rohmu/object_storage/swift.py @@ -129,14 +129,14 @@ def _metadata_for_key(self, key, *, resolve_manifest=False): def iter_key(self, key, *, with_metadata=True, deep=False, include_key=False): path = self.format_key_for_backend(key, trailing_slash=not include_key) self.log.debug("Listing path %r", path) - if deep: + if not deep: kwargs = {"delimiter": "/"} else: kwargs = {} _, results = self.conn.get_container(self.container_name, prefix=path, full_listing=True, **kwargs) for item in results: if "subdir" in item: - yield IterKeyItem(type=KEY_TYPE_PREFIX, value=self.format_key_from_backend(item["name"]).rstrip("/")) + yield IterKeyItem(type=KEY_TYPE_PREFIX, value=self.format_key_from_backend(item["subdir"]).rstrip("/")) else: if with_metadata: metadata = self._metadata_for_key(item["name"], resolve_manifest=True) @@ -250,44 +250,17 @@ def store_file_from_memory(self, key, memstring, metadata=None, cache_control=No ) def store_file_from_disk(self, key, filepath, metadata=None, multipart=None, cache_control=None, mimetype=None): - if cache_control is not None: - raise NotImplementedError("SwiftTransfer: cache_control support not implemented") - - if multipart: - # Start by trying to delete the file - if it's a potential multipart file we need to manually - # delete it, otherwise old segments won't be cleaned up by anything. Note that we only issue - # deletes with the store_file_from_disk functions, store_file_from_memory is used to upload smaller - # chunks. - with suppress(FileNotFoundFromStorageError): - self.delete_key(key) - key = self.format_key_for_backend(key) - headers = self._metadata_to_headers(self.sanitize_metadata(metadata)) obsz = os.path.getsize(filepath) with open(filepath, "rb") as fp: - if obsz <= self.segment_size: - self.log.debug("Uploading %r to %r (%r bytes)", filepath, key, obsz) - self.conn.put_object(self.container_name, key, contents=fp, content_length=obsz, headers=headers) - return - - # Segmented transfer - # upload segments of a file like `backup-bucket/site-name/basebackup/2016-03-22_0` - # to as `backup-bucket/site-name/basebackup_segments/2016-03-22_0/{:08x}` - segment_no = 0 - segment_path = "{}_segments/{}/".format(os.path.dirname(key), os.path.basename(key)) - segment_key_format = "{}{{:08x}}".format(segment_path).format - remaining = obsz - while remaining > 0: - this_segment_size = min(self.segment_size, remaining) - remaining -= this_segment_size - segment_no += 1 - self.log.debug("Uploading segment %r of %r to %r (%r bytes)", segment_no, filepath, key, this_segment_size) - segment_key = segment_key_format(segment_no) # pylint: disable=too-many-format-args - self.conn.put_object( - self.container_name, segment_key, contents=fp, content_length=this_segment_size, content_type=mimetype - ) - self.log.info("Uploaded %r segments of %r to %r", segment_no, key, segment_path) - headers["x-object-manifest"] = "{}/{}".format(self.container_name, segment_path.lstrip("/")) - self.conn.put_object(self.container_name, key, contents="", headers=headers, content_length=0) + self._store_file_contents( + key, + fp, + metadata=metadata, + multipart=multipart, + cache_control=cache_control, + mimetype=mimetype, + content_length=obsz + ) def get_or_create_container(self, container_name): start_time = time.monotonic() @@ -304,3 +277,75 @@ def get_or_create_container(self, container_name): return container_name raise return container_name + + def copy_file(self, *, source_key, destination_key, metadata=None, **_kwargs): + source_key = self.format_key_for_backend(source_key) + destination_key = "/".join((self.container_name, self.format_key_for_backend(destination_key))) + headers = self._metadata_to_headers(self.sanitize_metadata(metadata)) + if metadata: + headers["X-Fresh-Metadata"] = True + self.conn.copy_object(self.container_name, source_key, destination=destination_key, headers=headers) + + def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None): + metadata = metadata or {} + self._store_file_contents( + key, + fd, + cache_control=cache_control, + metadata=metadata, + mimetype=mimetype, + upload_progress_fn=upload_progress_fn, + multipart=True, + content_length=metadata.get("Content-Length") + ) + + def _store_file_contents( + self, + key, + fp, + cache_control=None, + metadata=None, + mimetype=None, + upload_progress_fn=None, + multipart=None, + content_length=None + ): + if cache_control is not None: + raise NotImplementedError("SwiftTransfer: cache_control support not implemented") + + if multipart: + # Start by trying to delete the file - if it's a potential multipart file we need to manually + # delete it, otherwise old segments won't be cleaned up by anything. Note that we only issue + # deletes with the store_file_from_disk functions, store_file_from_memory is used to upload smaller + # chunks. + with suppress(FileNotFoundFromStorageError): + self.delete_key(key) + key = self.format_key_for_backend(key) + headers = self._metadata_to_headers(self.sanitize_metadata(metadata)) + # Fall back to the "one segment" if possible + if (not multipart) or (not content_length) or content_length <= self.segment_size: + self.log.debug("Uploading %r to %r (%r bytes)", fp, key, content_length) + self.conn.put_object(self.container_name, key, contents=fp, content_length=content_length, headers=headers) + return + + # Segmented transfer + # upload segments of a file like `backup-bucket/site-name/basebackup/2016-03-22_0` + # to as `backup-bucket/site-name/basebackup_segments/2016-03-22_0/{:08x}` + segment_no = 0 + segment_path = "{}_segments/{}/".format(os.path.dirname(key), os.path.basename(key)) + segment_key_format = "{}{{:08x}}".format(segment_path).format + remaining = content_length + while remaining > 0: + this_segment_size = min(self.segment_size, remaining) + remaining -= this_segment_size + segment_no += 1 + self.log.debug("Uploading segment %r of %r to %r (%r bytes)", segment_no, fp, key, this_segment_size) + segment_key = segment_key_format(segment_no) # pylint: disable=too-many-format-args + self.conn.put_object( + self.container_name, segment_key, contents=fp, content_length=this_segment_size, content_type=mimetype + ) + if upload_progress_fn: + upload_progress_fn(content_length - remaining) + self.log.info("Uploaded %r segments of %r to %r", segment_no, key, segment_path) + headers["x-object-manifest"] = "{}/{}".format(self.container_name, segment_path.lstrip("/")) + self.conn.put_object(self.container_name, key, contents="", headers=headers, content_length=0) diff --git a/pghoard/transfer.py b/pghoard/transfer.py index a32c66cb..5808a957 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -4,29 +4,103 @@ Copyright (c) 2016 Ohmu Ltd See LICENSE for details """ +import dataclasses +import enum import logging import os import time +from dataclasses import dataclass +from io import BytesIO +from pathlib import Path from queue import Empty from threading import Lock, Thread +from typing import Any, BinaryIO, Dict, Optional, Union -from pghoard.common import create_alert_file, get_object_storage_config +from pghoard.common import ( + CallbackEvent, CallbackQueue, FileType, Queue, QuitEvent, StrEnum, create_alert_file, get_object_storage_config +) from pghoard.fetcher import FileFetchManager from pghoard.rohmu import get_transfer from pghoard.rohmu.compat import suppress -from pghoard.rohmu.errors import (FileNotFoundFromStorageError, LocalFileIsRemoteFileError) +from pghoard.rohmu.errors import FileNotFoundFromStorageError _STATS_LOCK = Lock() _last_stats_transmit_time = 0 +@enum.unique +class TransferOperation(StrEnum): + Download = "download" + Upload = "upload" + List = "list" + Metadata = "metadata" + + +@dataclass(frozen=True) +class BaseTransferEvent: + backup_site_key: str + file_type: FileType + file_path: Path + callback_queue: CallbackQueue + + +@dataclass(frozen=True) +class UploadEvent(BaseTransferEvent): + source_data: Union[BinaryIO, Path] + metadata: Dict[str, str] + file_size: Optional[int] + remove_after_upload: bool = True + retry_number: int = 0 + + @property + def operation(self): + return TransferOperation.Upload + + +@dataclass(frozen=True) +class DownloadEvent(BaseTransferEvent): + + destination_path: Path + opaque: Optional[Any] = None + file_size: Optional[int] = 0 + + @property + def operation(self): + return TransferOperation.Download + + +@dataclass(frozen=True) +class ListEvent(BaseTransferEvent): + @property + def operation(self): + return TransferOperation.List + + +@dataclass(frozen=True) +class MetadataEvent(BaseTransferEvent): + @property + def operation(self): + return TransferOperation.Metadata + + +OperationEvents = { + TransferOperation.Download: DownloadEvent, + TransferOperation.Upload: UploadEvent, + TransferOperation.List: ListEvent, + TransferOperation.Metadata: MetadataEvent +} + +# Should be changed to Queue[Union[CompressionEvent, Literal[QuitEvent]] once +# we drop older python versions +TransferQueue = Queue + + class TransferAgent(Thread): - def __init__(self, config, compression_queue, mp_manager, transfer_queue, metrics, shared_state_dict): + def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, shared_state_dict): super().__init__() self.log = logging.getLogger("TransferAgent") self.config = config self.metrics = metrics - self.compression_queue = compression_queue self.mp_manager = mp_manager self.fetch_manager = FileFetchManager(self.config, self.mp_manager, self.get_object_storage) self.transfer_queue = transfer_queue @@ -114,38 +188,40 @@ def run(self): file_to_transfer = self.transfer_queue.get(timeout=1.0) except Empty: continue - if file_to_transfer["type"] == "QUIT": + if file_to_transfer is QuitEvent: break - site = file_to_transfer["site"] - filetype = file_to_transfer["filetype"] - self.log.debug( - "Starting to %r %r, size: %r", file_to_transfer["type"], file_to_transfer["local_path"], - file_to_transfer.get("file_size", "unknown") - ) - file_to_transfer.setdefault("prefix", self.config["backup_sites"][site]["prefix"]) + site = file_to_transfer.backup_site_key + filetype = file_to_transfer.file_type + self.log.info("Processing TransferEvent %r", file_to_transfer) start_time = time.monotonic() - key = self.form_key_path(file_to_transfer) - oper = file_to_transfer["type"].lower() - oper_func = getattr(self, "handle_" + oper, None) - if oper_func is None: - self.log.warning("Invalid operation %r", file_to_transfer["type"]) - continue - - result = oper_func(site, key, file_to_transfer) + key = str(Path(file_to_transfer.backup_site_key) / file_to_transfer.file_path) + oper = str(file_to_transfer.operation) + if file_to_transfer.operation == TransferOperation.Download: + result = self.handle_download(site, key, file_to_transfer) + elif file_to_transfer.operation == TransferOperation.Upload: + result = self.handle_upload(site, key, file_to_transfer) + elif file_to_transfer.operation == TransferOperation.List: + result = self.handle_list(site, key, file_to_transfer) + elif file_to_transfer.operation == TransferOperation.Metadata: + result = self.handle_metadata(site, key, file_to_transfer) + else: + raise TypeError(f"Invalid transfer operation {file_to_transfer.operation}") # increment statistics counters self.set_state_defaults_for_site(site) - oper_size = file_to_transfer.get("file_size", 0) - if result["success"]: - filename = os.path.basename(file_to_transfer["local_path"]) - if oper == "upload": - if filetype == "xlog": + if not result: + self.state[site][oper][filetype]["failures"] += 1 + continue + oper_size = result.payload.get("file_size", 0) + if result.success: + filename = file_to_transfer.file_path.name + if oper == TransferOperation.Upload: + if filetype == FileType.Wal: self.state[site][oper]["xlog"]["xlogs_since_basebackup"] += 1 - elif filetype in {"basebackup", "basebackup_delta"}: + elif filetype in {FileType.Basebackup, FileType.Basebackup_chunk}: # reset corresponding xlog stats at basebackup self.state[site][oper]["xlog"]["xlogs_since_basebackup"] = 0 - self.metrics.gauge( "pghoard.xlogs_since_basebackup", self.state[site][oper]["xlog"]["xlogs_since_basebackup"], @@ -155,6 +231,7 @@ def run(self): self.state[site][oper][filetype]["last_success"] = time.monotonic() self.state[site][oper][filetype]["count"] += 1 self.state[site][oper][filetype]["data"] += oper_size + self.metrics.gauge( "pghoard.total_upload_size", self.state[site][oper][filetype]["data"], @@ -168,25 +245,24 @@ def run(self): else: self.state[site][oper][filetype]["failures"] += 1 - if oper in {"download", "upload"}: + if file_to_transfer.operation in {TransferOperation.Download, TransferOperation.Upload}: self.metrics.increase( "pghoard.{}_size".format(oper), inc_value=oper_size, tags={ - "result": "ok" if result["success"] else "failed", + "result": "ok" if result.success else "failed", "type": filetype, "site": site, } ) # push result to callback_queue if provided - if result.get("call_callback", True) and file_to_transfer.get("callback_queue"): - file_to_transfer["callback_queue"].put(result) + if file_to_transfer.callback_queue: + file_to_transfer.callback_queue.put(result) self.log.info( - "%r %stransfer of key: %r, size: %r, origin: %r took %.3fs", file_to_transfer["type"], - "FAILED " if not result["success"] else "", key, oper_size, - file_to_transfer.get("metadata", {}).get("host"), + "%r %stransfer of key: %r, size: %r, took %.3fs", oper, "FAILED " if not result.success else "", key, + oper_size, time.monotonic() - start_time ) @@ -197,93 +273,94 @@ def handle_list(self, site, key, file_to_transfer): try: storage = self.get_object_storage(site) items = storage.list_path(key) - file_to_transfer["file_size"] = len(repr(items)) # approx - return {"success": True, "items": items, "opaque": file_to_transfer.get("opaque")} + payload = {"file_size": len(repr(items)), "items": items} + return CallbackEvent(success=True, payload=payload) except FileNotFoundFromStorageError as ex: self.log.warning("%r not found from storage", key) - return {"success": False, "exception": ex, "opaque": file_to_transfer.get("opaque")} + return CallbackEvent(success=False, exception=ex) except Exception as ex: # pylint: disable=broad-except self.log.exception("Problem happened when retrieving metadata: %r, %r", key, file_to_transfer) self.metrics.unexpected_exception(ex, where="handle_list") - return {"success": False, "exception": ex, "opaque": file_to_transfer.get("opaque")} + return CallbackEvent(success=False, exception=ex) def handle_metadata(self, site, key, file_to_transfer): try: storage = self.get_object_storage(site) metadata = storage.get_metadata_for_key(key) - file_to_transfer["file_size"] = len(repr(metadata)) # approx - return {"success": True, "metadata": metadata, "opaque": file_to_transfer.get("opaque")} + payload = {"metadata": metadata, "file_size": len(repr(metadata))} + return CallbackEvent(success=True, payload=payload) except FileNotFoundFromStorageError as ex: self.log.warning("%r not found from storage", key) - return {"success": False, "exception": ex, "opaque": file_to_transfer.get("opaque")} + return CallbackEvent(success=False, exception=ex) except Exception as ex: # pylint: disable=broad-except self.log.exception("Problem happened when retrieving metadata: %r, %r", key, file_to_transfer) self.metrics.unexpected_exception(ex, where="handle_metadata") - return {"success": False, "exception": ex, "opaque": file_to_transfer.get("opaque")} + return CallbackEvent(success=False, exception=ex) def handle_download(self, site, key, file_to_transfer): try: - path = file_to_transfer["target_path"] + path = file_to_transfer.destination_path self.log.info("Requesting download of object key: src=%r dst=%r", key, path) file_size, metadata = self.fetch_manager.fetch_file(site, key, path) - file_to_transfer["file_size"] = file_size - return {"success": True, "opaque": file_to_transfer.get("opaque"), "target_path": path, "metadata": metadata} + payload = {"file_size": file_size, "metadata": metadata, "target_path": path} + return CallbackEvent(success=True, opaque=file_to_transfer.opaque, payload=payload) except FileNotFoundFromStorageError as ex: self.log.warning("%r not found from storage", key) - return {"success": False, "exception": ex, "opaque": file_to_transfer.get("opaque")} + return CallbackEvent(success=False, exception=ex, opaque=file_to_transfer.opaque) except Exception as ex: # pylint: disable=broad-except self.log.exception("Problem happened when downloading: %r, %r", key, file_to_transfer) self.metrics.unexpected_exception(ex, where="handle_download") - return {"success": False, "exception": ex, "opaque": file_to_transfer.get("opaque")} + return CallbackEvent(success=False, exception=ex, opaque=file_to_transfer.opaque) def handle_upload(self, site, key, file_to_transfer): + payload = {"file_size": file_to_transfer.file_size} try: storage = self.get_object_storage(site) - unlink_local = False - if "blob" in file_to_transfer: - self.log.info("Uploading memory-blob to object store: dst=%r", key) - storage.store_file_from_memory(key, file_to_transfer["blob"], metadata=file_to_transfer["metadata"]) + unlink_local = file_to_transfer.remove_after_upload + self.log.info("Uploading file to object store: src=%r dst=%r", file_to_transfer.source_data, key) + if not isinstance(file_to_transfer.source_data, BytesIO): + f = open(file_to_transfer.source_data, "rb") else: - # Basebackups may be multipart uploads, depending on the driver. - # Swift needs to know about this so it can do possible cleanups. - multipart = file_to_transfer["filetype"] in {"basebackup", "basebackup_chunk", "basebackup_delta"} - try: - self.log.info("Uploading file to object store: src=%r dst=%r", file_to_transfer["local_path"], key) - storage.store_file_from_disk( - key, file_to_transfer["local_path"], metadata=file_to_transfer["metadata"], multipart=multipart - ) - unlink_local = True - except LocalFileIsRemoteFileError: - pass + f = file_to_transfer.source_data + with f: + metadata = file_to_transfer.metadata.copy() + if file_to_transfer.file_size: + metadata["Content-Length"] = file_to_transfer.file_size + storage.store_file_object(key, f, metadata=metadata) if unlink_local: try: - self.log.info("Deleting file: %r since it has been uploaded", file_to_transfer["local_path"]) - os.unlink(file_to_transfer["local_path"]) - metadata_path = file_to_transfer["local_path"] + ".metadata" - with suppress(FileNotFoundError): - os.unlink(metadata_path) + self.log.info("Deleting file: %r since it has been uploaded", file_to_transfer.source_data) + os.unlink(file_to_transfer.source_data) + # If we're working from pathes, then compute the .metadata + # path. + # FIXME: should be part of the event itself + if isinstance(file_to_transfer.source_data, Path): + metadata_path = file_to_transfer.source_data.with_name( + file_to_transfer.source_data.name + ".metadata" + ) + with suppress(FileNotFoundError): + os.unlink(metadata_path) except Exception as ex: # pylint: disable=broad-except - self.log.exception("Problem in deleting file: %r", file_to_transfer["local_path"]) + self.log.exception("Problem in deleting file: %r", file_to_transfer.source_data) self.metrics.unexpected_exception(ex, where="handle_upload_unlink") - return {"success": True, "opaque": file_to_transfer.get("opaque")} + return CallbackEvent(success=True, payload=payload) except Exception as ex: # pylint: disable=broad-except - if file_to_transfer.get("retry_number", 0) > 0: - self.log.exception("Problem in moving file: %r, need to retry", file_to_transfer["local_path"]) + if file_to_transfer.retry_number > 0: + self.log.exception("Problem in moving file: %r, need to retry", file_to_transfer.source_data) # Ignore the exception the first time round as some object stores have frequent Internal Errors # and the upload usually goes through without any issues the second time round self.metrics.unexpected_exception(ex, where="handle_upload") else: self.log.warning( - "Problem in moving file: %r, need to retry (%s: %s)", file_to_transfer["local_path"], + "Problem in moving file: %r, need to retry (%s: %s)", file_to_transfer.source_data, ex.__class__.__name__, ex ) - - file_to_transfer["retry_number"] = file_to_transfer.get("retry_number", 0) + 1 - if file_to_transfer["retry_number"] > self.config["upload_retries_warning_limit"]: + file_to_transfer = dataclasses.replace(file_to_transfer, retry_number=file_to_transfer.retry_number + 1) + if file_to_transfer.retry_number > self.config["upload_retries_warning_limit"]: create_alert_file(self.config, "upload_retries_warning") # Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times - self.sleep(min(0.5 * 2 ** (file_to_transfer["retry_number"] - 1), 20)) + self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20)) self.transfer_queue.put(file_to_transfer) - return {"success": False, "call_callback": False, "exception": ex} + return None diff --git a/pghoard/wal.py b/pghoard/wal.py index bd0f04bc..aaa0ce1a 100644 --- a/pghoard/wal.py +++ b/pghoard/wal.py @@ -49,14 +49,14 @@ class WalBlobLengthError(ValueError): WalHeader = namedtuple("WalHeader", ("version", "lsn")) -def segments_per_xlogid(server_version: int) -> int: +def segments_per_xlogid(server_version: Optional[int]) -> int: if server_version is not None and server_version < 90300: return 0x0FFFFFFFF // WAL_SEG_SIZE return 0x100000000 // WAL_SEG_SIZE class LSN: - def __init__(self, value: Union[int, str], server_version: int, timeline_id: Optional[int] = None): + def __init__(self, value: Union[int, str], server_version: Optional[int], timeline_id: Optional[int] = None): self.timeline_id = timeline_id self.server_version = server_version if isinstance(value, int): @@ -149,7 +149,7 @@ def __add__(self, other: int): def __sub__(self, other) -> int: if isinstance(other, LSN): - self._assert_sane_for_comparison(self, other) + self._assert_sane_for_comparison(other) val = other.lsn elif isinstance(other, int): val = other @@ -198,7 +198,7 @@ def read_header(blob): return WalHeader(version=version, lsn=lsn) -def lsn_from_sysinfo(sysinfo: tuple, pg_version: str = None) -> LSN: +def lsn_from_sysinfo(sysinfo: tuple, pg_version: Optional[int] = None) -> LSN: """Get wal file name out of a IDENTIFY_SYSTEM tuple """ return LSN(sysinfo[2], timeline_id=int(sysinfo[1]), server_version=pg_version) diff --git a/pghoard/walreceiver.py b/pghoard/walreceiver.py index dfd2059d..9d8cb842 100644 --- a/pghoard/walreceiver.py +++ b/pghoard/walreceiver.py @@ -18,7 +18,8 @@ REPLICATION_PHYSICAL, PhysicalReplicationConnection ) -from pghoard.common import suppress +from pghoard.common import FileType, FileTypePrefixes, suppress +from pghoard.compressor import CompressionEvent from pghoard.wal import LSN, WAL_SEG_SIZE, lsn_from_sysinfo KEEPALIVE_INTERVAL = 10.0 @@ -81,14 +82,14 @@ def fetch_timeline_history_files(self, max_timeline): history_data = timeline_history[1].tobytes() self.log.debug("Received timeline history: %s for timeline %r", history_filename, max_timeline) - compression_event = { - "type": "CLOSE_WRITE", - "compress_to_memory": True, - "delete_file_after_compression": False, - "input_data": BytesIO(history_data), - "full_path": history_filename, - "site": self.site, - } + compression_event = CompressionEvent( + compress_to_memory=True, + source_data=BytesIO(history_data), + file_path=FileTypePrefixes[FileType.Timeline] / history_filename, + file_type=FileType.Timeline, + backup_site_key=self.site, + metadata={} + ) self.compression_queue.put(compression_event) max_timeline -= 1 @@ -133,16 +134,15 @@ def switch_wal(self): callback_queue = Queue() self.callbacks[self.latest_wal_start] = callback_queue - compression_event = { - "type": "MOVE", - "callback_queue": callback_queue, - "compress_to_memory": True, - "delete_file_after_compression": False, - "input_data": wal_data, - "full_path": self.latest_wal, - "site": self.site, - "src_path": "{}.partial".format(self.latest_wal), - } + compression_event = CompressionEvent( + callback_queue=callback_queue, + compress_to_memory=True, + source_data=wal_data, + file_type=FileType.Wal, + file_path=FileTypePrefixes[FileType.Wal] / self.latest_wal, + backup_site_key=self.site, + metadata={} + ) self.latest_wal = None self.compression_queue.put(compression_event) diff --git a/pghoard/webserver.py b/pghoard/webserver.py index 0eaade53..2601f989 100644 --- a/pghoard/webserver.py +++ b/pghoard/webserver.py @@ -13,14 +13,17 @@ from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path from queue import Empty, Queue from socketserver import ThreadingMixIn from threading import RLock, Thread from pghoard import wal -from pghoard.common import get_pg_wal_directory, json_encode +from pghoard.common import (FileType, FileTypePrefixes, get_pg_wal_directory, json_encode) +from pghoard.compressor import CompressionEvent from pghoard.rohmu.compat import suppress from pghoard.rohmu.errors import Error, FileNotFoundFromStorageError +from pghoard.transfer import DownloadEvent, OperationEvents, TransferOperation from pghoard.version import __version__ @@ -222,15 +225,12 @@ def _transfer_agent_op(self, site, filename, filetype, method, *, retries=2): start_time = time.time() self.server.log.debug("Requesting site: %r, filename: %r, filetype: %r", site, filename, filetype) - + filetype = FileType(filetype) + filepath = Path(FileTypePrefixes[filetype]) / filename callback_queue = Queue() - self.server.transfer_queue.put({ - "callback_queue": callback_queue, - "filetype": filetype, - "local_path": filename, - "site": site, - "type": method, - }) + cls = OperationEvents[method] + ev = cls(callback_queue=callback_queue, file_type=filetype, file_path=filepath, backup_site_key=site) + self.server.transfer_queue.put(ev) try: try: @@ -238,14 +238,14 @@ def _transfer_agent_op(self, site, filename, filetype, method, *, retries=2): self.server.log.debug("Handled a %s request for: %r, took: %.3fs", method, site, time.time() - start_time) except Empty: self.server.log.exception( - "Timeout on a %s request for: %r, took: %.3fs", method, site, - time.time() - start_time + "Timeout on a %s request for: %r, took: %.3fs %s", method, site, + time.time() - start_time, ev ) raise HttpResponse("TIMEOUT", status=500) - if not response["success"]: - if isinstance(response.get("exception"), FileNotFoundFromStorageError): - raise HttpResponse("{0.__class__.__name__}: {0}".format(response["exception"]), status=404) + if not response.success: + if isinstance(response.exception, FileNotFoundFromStorageError): + raise HttpResponse("{0.__class__.__name__}: {0}".format(response.exception), status=404) raise HttpResponse(status=500) except HttpResponse as ex: if ex.status == 500 and retries: @@ -330,41 +330,42 @@ def _create_fetch_operation(self, key, site, filetype, obname, max_age=-1, suppr started_at=time.monotonic(), target_path=target_path, ) - self.server.transfer_queue.put({ - "callback_queue": self.server.download_results, - "filetype": filetype, - "local_path": obname, - "opaque": key, - "site": site, - "target_path": tmp_target_path, - "type": "DOWNLOAD", - }) + self.server.transfer_queue.put( + DownloadEvent( + callback_queue=self.server.download_results, + file_type=filetype, + file_path=FileTypePrefixes[filetype] / obname, + backup_site_key=site, + destination_path=Path(tmp_target_path), + opaque=key + ) + ) def _process_completed_download_operations(self, timeout=None): while True: try: result = self.server.download_results.get(block=timeout is not None, timeout=timeout) - key = result["opaque"] + key = result.opaque with self.server.lock: op = self.server.pending_download_ops.pop(key, None) if not op: self.server.log.warning("Orphaned download operation %r completed: %r", key, result) - if result["success"]: + if result.success: with suppress(OSError): - os.unlink(result["target_path"]) + os.unlink(result.payload["target_path"]) continue - if result["success"]: + if result.success: if os.path.isfile(op["target_path"]): self.server.log.warning("Target path for %r already exists, skipping", key) continue - os.rename(result["target_path"], op["target_path"]) - metadata = result["metadata"] or {} + os.rename(result.payload["target_path"], op["target_path"]) + metadata = result.payload["metadata"] or {} self.server.log.info( - "Renamed %s to %s. Original upload from %r, hash %s:%s", result["target_path"], + "Renamed %s to %s. Original upload from %r, hash %s:%s", result.payload["target_path"], op["target_path"], metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash") ) else: - ex = result.get("exception", Error) + ex = result.exception or Error if isinstance(ex, FileNotFoundFromStorageError): # don't try prefetching this file again self.server.prefetch_404.append(key) @@ -492,8 +493,8 @@ def get_wal_or_timeline_file(self, site, filename, filetype): raise HttpResponse("TIMEOUT", status=500) def list_basebackups(self, site): - response = self._transfer_agent_op(site, "", "basebackup", "LIST") - raise HttpResponse({"basebackups": response["items"]}, status=200) + response = self._transfer_agent_op(site, "", "basebackup", TransferOperation.List) + raise HttpResponse({"basebackups": response.payload["items"]}, status=200) def handle_archival_request(self, site, filename, filetype): if filetype == "basebackup": @@ -519,15 +520,20 @@ def handle_archival_request(self, site, filename, filetype): compress_to_memory = False else: compress_to_memory = True - compression_event = { - "type": "MOVE", - "callback_queue": callback_queue, - "compress_to_memory": compress_to_memory, - "delete_file_after_compression": False, - "full_path": xlog_path, - "site": site, - "src_path": "{}.partial".format(xlog_path), - } + if filename.endswith(".history"): + filetype = FileType.Timeline + else: + filetype = FileType.Wal + compression_event = CompressionEvent( + callback_queue=callback_queue, + compress_to_memory=compress_to_memory, + delete_file_after_compression=False, + file_path=FileTypePrefixes[filetype] / filename, + source_data=Path(xlog_path), + file_type=filetype, + backup_site_key=site, + metadata={} + ) self.server.compression_queue.put(compression_event) try: response = callback_queue.get(timeout=30) @@ -542,7 +548,7 @@ def handle_archival_request(self, site, filename, filetype): ) raise HttpResponse("TIMEOUT", status=500) - if not response["success"]: + if not response.success: raise HttpResponse(status=500) raise HttpResponse(status=201) @@ -557,8 +563,8 @@ def do_HEAD(self): site, obtype, obname = self._parse_request(path) if self.headers.get("x-pghoard-target-path"): raise HttpResponse("x-pghoard-target-path header is only valid for downloads", status=400) - response = self._transfer_agent_op(site, obname, obtype, "METADATA") - metadata = response["metadata"] + response = self._transfer_agent_op(site, obname, obtype, TransferOperation.Metadata) + metadata = response.payload["metadata"] headers = {} if metadata.get("hash") and metadata.get("hash-algorithm"): headers["metadata-hash"] = metadata["hash"] diff --git a/test/test_basebackup.py b/test/test_basebackup.py index 4582e3e3..634c5d75 100644 --- a/test/test_basebackup.py +++ b/test/test_basebackup.py @@ -230,7 +230,7 @@ def _test_create_basebackup(self, capsys, db, pghoard, mode, replica=False, acti } pghoard.create_basebackup(pghoard.test_site, db.user, basebackup_path, q, metadata) result = q.get(timeout=60) - assert result["success"] + assert result.success # make sure it shows on the list Restore().run([ diff --git a/test/test_compressor.py b/test/test_compressor.py index 6d99b217..8eda79c1 100644 --- a/test/test_compressor.py +++ b/test/test_compressor.py @@ -10,6 +10,7 @@ import os import random import socket +from pathlib import Path from queue import Queue from threading import Event @@ -17,7 +18,8 @@ import pytest from pghoard import metrics -from pghoard.compressor import CompressorThread +from pghoard.common import FileType, FileTypePrefixes, QuitEvent +from pghoard.compressor import (CompressionEvent, CompressorThread, DecompressionEvent) from pghoard.rohmu import IO_BLOCK_SIZE, compressor, rohmufile from pghoard.rohmu.compressor import zstd from pghoard.rohmu.snappyfile import SnappyFile, snappy @@ -31,7 +33,6 @@ class WALTester: def __init__(self, path, name, mode): """Create a random or zero file resembling a valid WAL, bigger than block size, with a valid header.""" self.path = os.path.join(path, name) - self.path_partial = self.path + ".partial" self.contents = wal_header_for_file(name) if mode == "random": self.contents += os.urandom(IO_BLOCK_SIZE * 2) @@ -109,44 +110,10 @@ def setup_method(self, method): def teardown_method(self, method): self.compressor.running = False - self.compression_queue.put({"type": "QUIT"}) + self.compression_queue.put(QuitEvent) self.compressor.join() super().teardown_method(method) - def test_get_event_type(self): - # Rename from .partial to final should be recognized - event = { - "full_path": "/out/00000001000000000000000C", - "src_path": "/tmp/00000001000000000000000C.partial", - "type": "MOVE", - } - assert self.compressor.get_event_filetype(event) == "xlog" - # Rename from non-partial suffix is not recognized - event["src_path"] += "xyz" - assert self.compressor.get_event_filetype(event) is None - # other event types are ignored - event["type"] = "NAKKI" - assert self.compressor.get_event_filetype(event) is None - - # Timeline history files are handled the same way (do they actually ever have .partial?) - event = { - "full_path": "/xlog/0000000A.history", - "src_path": "/tmp/0000000A.history.partial", - "type": "MOVE", - } - assert self.compressor.get_event_filetype(event) == "timeline" - event["src_path"] += "xyz" - assert self.compressor.get_event_filetype(event) is None - del event["src_path"] - event["type"] = "CLOSE_WRITE" - assert self.compressor.get_event_filetype(event) == "timeline" - - event = { - "full_path": "/data/base.tar", - "type": "CLOSE_WRITE", - } - assert self.compressor.get_event_filetype(event) == "basebackup" - def test_write_file(self): ifile = WALTester(self.incoming_path, "00000001000000000000000D", "random") with open(ifile.path, "rb") as input_obj, io.BytesIO() as output_obj: @@ -162,7 +129,7 @@ def test_write_file(self): def test_compress_to_file_wal(self): ifile = WALTester(self.incoming_path, "00000001000000000000000F", "random") - self._test_compress_to_file("xlog", ifile.size, ifile.path, ifile.path_partial) + self._test_compress_to_file("xlog", ifile.size, ifile.path) def test_compress_to_file_history(self): file_path = os.path.join(self.incoming_path, "0000000F.history") @@ -172,18 +139,27 @@ def test_compress_to_file_history(self): out.write(contents) file_size = out.tell() - self._test_compress_to_file("timeline", file_size, file_path, file_path + ".partial") - - def _test_compress_to_file(self, filetype, file_size, file_path, file_path_partial): - self.compression_queue.put({ - "type": "MOVE", - "src_path": file_path_partial, - "full_path": file_path, - }) + self._test_compress_to_file("timeline", file_size, file_path) + + def _test_compress_to_file(self, filetype, file_size, file_path): + filetype = FileType(filetype) + file_path = Path(file_path) + dest_file_path = FileTypePrefixes[filetype] / file_path.name + self.compression_queue.put( + CompressionEvent( + file_path=dest_file_path, + source_data=file_path, + file_type=FileType(filetype), + backup_site_key=self.test_site, + callback_queue=None, + metadata={} + ) + ) transfer_event = self.transfer_queue.get(timeout=5.0) expected = { - "filetype": filetype, - "local_path": file_path.replace(self.incoming_path, self.handled_path), + "file_type": filetype, + "file_path": dest_file_path, + "source_data": self.compressor.get_compressed_file_dir(self.test_site) / dest_file_path, "metadata": { "compression-algorithm": self.algorithm, "compression-level": 0, @@ -191,27 +167,34 @@ def _test_compress_to_file(self, filetype, file_size, file_path, file_path_parti "original-file-size": file_size, "pg-version": 90500, }, - "site": self.test_site, + "backup_site_key": self.test_site, } for key, value in expected.items(): if key == "metadata" and filetype == "xlog": - assert transfer_event[key].pop("hash") - assert transfer_event[key].pop("hash-algorithm") == "sha1" - assert transfer_event[key] == value + assert transfer_event.metadata.pop("hash") + assert transfer_event.metadata.pop("hash-algorithm") == "sha1" + assert getattr(transfer_event, key) == value def test_compress_to_memory(self): ifile = WALTester(self.incoming_path, "0000000100000000000000FF", "random") - self.compression_queue.put({ - "compress_to_memory": True, - "delete_file_after_compression": False, - "full_path": ifile.path, - "src_path": ifile.path_partial, - "type": "MOVE", - }) + filetype = FileType.Wal + file_path = Path(ifile.path) + dest_file_path = FileTypePrefixes[filetype] / file_path.name + self.compression_queue.put( + CompressionEvent( + compress_to_memory=True, + file_path=dest_file_path, + source_data=file_path, + file_type=FileType.Wal, + backup_site_key=self.test_site, + callback_queue=None, + metadata={} + ) + ) expected = { "callback_queue": None, - "filetype": "xlog", - "local_path": ifile.path, + "file_type": FileType.Wal, + "file_path": dest_file_path, "metadata": { "compression-algorithm": self.algorithm, "compression-level": 0, @@ -219,16 +202,16 @@ def test_compress_to_memory(self): "original-file-size": ifile.size, "pg-version": 90500, }, - "site": self.test_site, + "backup_site_key": self.test_site, } transfer_event = self.transfer_queue.get(timeout=3.0) for key, value in expected.items(): if key == "metadata": - assert transfer_event[key].pop("hash") - assert transfer_event[key].pop("hash-algorithm") == "sha1" - assert transfer_event[key] == value - - result = self.decompress(transfer_event["blob"]) + assert transfer_event.metadata.pop("hash") + assert transfer_event.metadata.pop("hash-algorithm") == "sha1" + assert getattr(transfer_event, key) == value + assert isinstance(transfer_event.source_data, io.BytesIO) + result = self.decompress(transfer_event.source_data.getvalue()) assert result[:100] == ifile.contents[:100] assert result == ifile.contents @@ -258,13 +241,18 @@ def test_compress_error_retry(self, side_effects, is_failure): test_compressor.start() ifile = WALTester(self.incoming_path, "0000000100000000000000FF", "random") assert not test_compressor.critical_failure_event.is_set(), "Critical failure event shouldn't be set yet" - compression_queue.put({ - "compress_to_memory": True, - "delete_file_after_compression": False, - "full_path": ifile.path, - "src_path": ifile.path_partial, - "type": "MOVE", - }) + file_path = Path(ifile.path) + compression_queue.put( + CompressionEvent( + compress_to_memory=True, + file_path=FileTypePrefixes[FileType.Wal] / file_path.name, + source_data=file_path, + backup_site_key=self.test_site, + file_type=FileType.Wal, + callback_queue=None, + metadata={} + ) + ) test_compressor.critical_failure_event.wait(5) if is_failure: assert test_compressor.critical_failure_event.is_set(), "Critical failure event should be set" @@ -281,19 +269,23 @@ def test_compress_error_retry(self, side_effects, is_failure): def test_compress_encrypt_to_memory(self): ifile = WALTester(self.incoming_path, "0000000100000000000000FB", "random") + file_path = Path(ifile.path) + dest_file_path = FileTypePrefixes[FileType.Wal] / file_path.name self.compressor.config["backup_sites"][self.test_site]["encryption_key_id"] = "testkey" - event = { - "compress_to_memory": True, - "delete_file_after_compression": False, - "full_path": ifile.path, - "src_path": ifile.path_partial, - "type": "MOVE", - } - self.compressor.handle_event(event, filetype="xlog") + event = CompressionEvent( + compress_to_memory=True, + file_path=dest_file_path, + source_data=file_path, + backup_site_key=self.test_site, + file_type=FileType.Wal, + callback_queue=None, + metadata={} + ) + self.compressor.handle_event(event) expected = { "callback_queue": None, - "filetype": "xlog", - "local_path": ifile.path, + "file_type": "xlog", + "file_path": dest_file_path, "metadata": { "compression-algorithm": self.algorithm, "compression-level": 0, @@ -302,32 +294,35 @@ def test_compress_encrypt_to_memory(self): "original-file-size": ifile.size, "pg-version": 90500, }, - "site": self.test_site, + "backup_site_key": self.test_site, } transfer_event = self.transfer_queue.get(timeout=5.0) for key, value in expected.items(): if key == "metadata": - assert transfer_event[key].pop("hash") - assert transfer_event[key].pop("hash-algorithm") == "sha1" - assert transfer_event[key] == value + assert transfer_event.metadata.pop("hash") + assert transfer_event.metadata.pop("hash-algorithm") == "sha1" + assert getattr(transfer_event, key) == value def test_archive_command_compression(self): zero = WALTester(self.incoming_path, "00000001000000000000000D", "zero") + file_path = Path(zero.path) + dest_file_path = FileTypePrefixes[FileType.Wal] / file_path.name callback_queue = Queue() - event = { - "callback_queue": callback_queue, - "compress_to_memory": True, - "delete_file_after_compression": False, - "full_path": zero.path, - "src_path": zero.path_partial, - "type": "MOVE", - } + event = CompressionEvent( + callback_queue=callback_queue, + compress_to_memory=True, + file_path=dest_file_path, + source_data=file_path, + backup_site_key=self.test_site, + file_type=FileType.Wal, + metadata={} + ) self.compression_queue.put(event) transfer_event = self.transfer_queue.get(timeout=5.0) expected = { "callback_queue": callback_queue, - "filetype": "xlog", - "local_path": zero.path, + "file_type": "xlog", + "file_path": dest_file_path, "metadata": { "compression-algorithm": self.algorithm, "compression-level": 0, @@ -335,35 +330,40 @@ def test_archive_command_compression(self): "original-file-size": zero.size, "pg-version": 90500, }, - "site": self.test_site, + "backup_site_key": self.test_site, } for key, value in expected.items(): if key == "metadata": - assert transfer_event[key].pop("hash") - assert transfer_event[key].pop("hash-algorithm") == "sha1" - assert transfer_event[key] == value - - assert self.decompress(transfer_event["blob"]) == zero.contents + assert transfer_event.metadata.pop("hash") + assert transfer_event.metadata.pop("hash-algorithm") == "sha1" + assert getattr(transfer_event, key) == value + assert isinstance(transfer_event.source_data, io.BytesIO) + assert self.decompress(transfer_event.source_data.getvalue()) == zero.contents def test_decompression_event(self): ifile = WALTester(self.incoming_path, "00000001000000000000000A", "random") callback_queue = Queue() local_filepath = os.path.join(self.temp_dir, "00000001000000000000000A") - self.compression_queue.put({ - "blob": self.compress(ifile.contents), - "callback_queue": callback_queue, - "filetype": "xlog", - "local_path": local_filepath, - "metadata": { - "compression-algorithm": self.algorithm, - "compression-level": 0, - "host": socket.gethostname(), - "original-file-size": ifile.size, - "pg-version": 90500, - }, - "site": self.test_site, - "type": "DECOMPRESSION", - }) + file_path = Path(local_filepath) + dest_file_path = FileTypePrefixes[FileType.Wal] / file_path.name + data = io.BytesIO(self.compress(ifile.contents)) + self.compression_queue.put( + DecompressionEvent( + source_data=data, + file_path=dest_file_path, + callback_queue=callback_queue, + file_type=FileType.Wal, + destination_path=file_path, + metadata={ + "compression-algorithm": self.algorithm, + "compression-level": 0, + "host": socket.gethostname(), + "original-file-size": ifile.size, + "pg-version": 90500, + }, + backup_site_key=self.test_site, + ) + ) callback_queue.get(timeout=5.0) assert os.path.exists(local_filepath) is True with open(local_filepath, "rb") as fp: @@ -385,22 +385,24 @@ def test_decompression_decrypt_event(self): ) callback_queue = Queue() local_filepath = os.path.join(self.temp_dir, "00000001000000000000000E") - self.compression_queue.put({ - "blob": output_obj.getvalue(), - "callback_queue": callback_queue, - "filetype": "xlog", - "local_path": local_filepath, - "metadata": { - "compression-algorithm": self.algorithm, - "compression-level": 0, - "encryption-key-id": "testkey", - "host": socket.gethostname(), - "original-file-size": ifile.size, - "pg-version": 90500, - }, - "site": self.test_site, - "type": "DECOMPRESSION", - }) + self.compression_queue.put( + DecompressionEvent( + source_data=output_obj, + callback_queue=callback_queue, + file_type=FileType.Wal, + file_path=FileTypePrefixes[FileType.Wal] / Path(local_filepath).name, + destination_path=Path(local_filepath), + metadata={ + "compression-algorithm": self.algorithm, + "compression-level": 0, + "encryption-key-id": "testkey", + "host": socket.gethostname(), + "original-file-size": ifile.size, + "pg-version": 90500, + }, + backup_site_key=self.test_site, + ) + ) callback_queue.get(timeout=5.0) assert os.path.exists(local_filepath) is True with open(local_filepath, "rb") as fp: diff --git a/test/test_transferagent.py b/test/test_transferagent.py index ff7ca5ee..a5d53809 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -6,14 +6,16 @@ """ import os import time +from pathlib import Path from queue import Empty, Queue from unittest.mock import Mock import pytest from pghoard import metrics +from pghoard.common import CallbackEvent, FileType, QuitEvent from pghoard.rohmu.errors import FileNotFoundFromStorageError, StorageError -from pghoard.transfer import TransferAgent +from pghoard.transfer import DownloadEvent, TransferAgent, UploadEvent # pylint: disable=attribute-defined-outside-init from .base import PGHoardTestCase @@ -34,6 +36,9 @@ def get_contents_to_string(self, key): # pylint: disable=unused-argument def store_file_from_disk(self, key, local_path, metadata, multipart=None): # pylint: disable=unused-argument raise StorageError("foo") + def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None): + raise StorageError("foo") + class TestTransferAgent(PGHoardTestCase): def setup_method(self, method): @@ -63,7 +68,6 @@ def setup_method(self, method): self.transfer_queue = Queue() self.transfer_agent = TransferAgent( config=self.config, - compression_queue=self.compression_queue, mp_manager=None, transfer_queue=self.transfer_queue, metrics=metrics.Metrics(statsd={}), @@ -73,44 +77,45 @@ def setup_method(self, method): def teardown_method(self, method): self.transfer_agent.running = False - self.transfer_queue.put({"type": "QUIT"}) + self.transfer_queue.put(QuitEvent) self.transfer_agent.join() super().teardown_method(method) def test_handle_download(self): callback_queue = Queue() self.transfer_agent.get_object_storage = MockStorage() - self.transfer_queue.put({ - "callback_queue": callback_queue, - "filetype": "xlog", - "local_path": self.temp_dir, - "opaque": 42, - "site": self.test_site, - "target_path": self.temp_dir, - "type": "DOWNLOAD", - }) + self.transfer_queue.put( + DownloadEvent( + callback_queue=callback_queue, + file_type=FileType.Wal, + destination_path=self.temp_dir, + file_path=Path("nonexistent/file"), + opaque=42, + backup_site_key=self.test_site + ) + ) event = callback_queue.get(timeout=1.0) - assert event["success"] is False - assert event["opaque"] == 42 - assert isinstance(event["exception"], FileNotFoundFromStorageError) + assert event.success is False + assert event.opaque == 42 + assert isinstance(event.exception, FileNotFoundFromStorageError) def test_handle_upload_xlog(self): callback_queue = Queue() storage = MockStorage() self.transfer_agent.get_object_storage = storage assert os.path.exists(self.foo_path) is True - self.transfer_queue.put({ - "callback_queue": callback_queue, - "file_size": 3, - "filetype": "xlog", - "local_path": self.foo_path, - "metadata": { - "start-wal-segment": "00000001000000000000000C" - }, - "site": self.test_site, - "type": "UPLOAD", - }) - assert callback_queue.get(timeout=1.0) == {"success": True, "opaque": None} + self.transfer_queue.put( + UploadEvent( + callback_queue=callback_queue, + file_type=FileType.Wal, + file_path=Path("xlog/00000001000000000000000C"), + file_size=3, + source_data=Path(self.foo_path), + metadata={"start-wal-segment": "00000001000000000000000C"}, + backup_site_key=self.test_site + ) + ) + assert callback_queue.get(timeout=1.0) == CallbackEvent(success=True, payload={"file_size": 3}) assert os.path.exists(self.foo_path) is False def test_handle_upload_basebackup(self): @@ -118,18 +123,18 @@ def test_handle_upload_basebackup(self): storage = MockStorage() self.transfer_agent.get_object_storage = storage assert os.path.exists(self.foo_path) is True - self.transfer_queue.put({ - "callback_queue": callback_queue, - "file_size": 3, - "filetype": "basebackup", - "local_path": self.foo_basebackup_path, - "metadata": { - "start-wal-segment": "00000001000000000000000C" - }, - "site": self.test_site, - "type": "UPLOAD", - }) - assert callback_queue.get(timeout=1.0) == {"success": True, "opaque": None} + self.transfer_queue.put( + UploadEvent( + callback_queue=callback_queue, + file_type=FileType.Basebackup, + file_path=Path("basebackup/2015-04-15_0"), + file_size=3, + source_data=Path(self.foo_basebackup_path), + metadata={"start-wal-segment": "00000001000000000000000C"}, + backup_site_key=self.test_site + ) + ) + assert callback_queue.get(timeout=1.0) == CallbackEvent(success=True, payload={"file_size": 3}) assert os.path.exists(self.foo_basebackup_path) is False @pytest.mark.timeout(10) @@ -145,17 +150,17 @@ def sleep(sleep_amount): self.transfer_agent.sleep = sleep self.transfer_agent.get_object_storage = storage assert os.path.exists(self.foo_path) is True - self.transfer_queue.put({ - "callback_queue": callback_queue, - "file_size": 3, - "filetype": "xlog", - "local_path": self.foo_path, - "metadata": { - "start-wal-segment": "00000001000000000000000C" - }, - "site": self.test_site, - "type": "UPLOAD", - }) + self.transfer_queue.put( + UploadEvent( + callback_queue=callback_queue, + file_type=FileType.Wal, + file_path=Path("xlog/00000001000000000000000C"), + file_size=3, + source_data=Path(self.foo_path), + backup_site_key=self.test_site, + metadata={} + ) + ) while len(sleeps) < 8: with pytest.raises(Empty): callback_queue.get(timeout=0.01) diff --git a/test/test_wal_file_deleter.py b/test/test_wal_file_deleter.py index 83e758e9..b242eaa2 100644 --- a/test/test_wal_file_deleter.py +++ b/test/test_wal_file_deleter.py @@ -1,13 +1,15 @@ # Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ import time +from pathlib import Path from queue import Queue import mock import pytest from pghoard import metrics -from pghoard.compressor import WALFileDeleterThread +from pghoard.common import QuitEvent +from pghoard.compressor import WALFileDeleterThread, WalFileDeletionEvent # too fool the @@ -31,16 +33,12 @@ def fixture_wal_file_deleter(mocker): deleter.start() yield deleter deleter.running = False - deleter_queue.put({"type": "QUIT"}) + deleter_queue.put(QuitEvent) deleter.join() def make_event(path: str, site: str = "a"): - return { - "type": "delete_file", - "site": site, - "local_path": path, - } + return WalFileDeletionEvent(backup_site_key=site, file_path=Path(path)) TEST_WAIT_TIME = 0.1 @@ -51,91 +49,50 @@ def test_wal_file_deleter_happy_path(wal_file_deleter: WALFileDeleterThreadPatch wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000001")) time.sleep(TEST_WAIT_TIME) assert len(wal_file_deleter.to_be_deleted_files["a"]) == 1 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000001"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000001")} wal_file_deleter.os_unlink_mock.assert_not_called() wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000002")) time.sleep(TEST_WAIT_TIME) assert len(wal_file_deleter.to_be_deleted_files["a"]) == 1 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000002"} - wal_file_deleter.os_unlink_mock.assert_called_once_with("AA000001") + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000002")} + wal_file_deleter.os_unlink_mock.assert_called_once_with(Path("AA000001")) wal_file_deleter.os_unlink_mock.reset_mock() wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000001")) time.sleep(TEST_WAIT_TIME) assert len(wal_file_deleter.to_be_deleted_files["a"]) == 1 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000002"} - wal_file_deleter.os_unlink_mock.assert_called_once_with("AA000001") + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000002")} + wal_file_deleter.os_unlink_mock.assert_called_once_with(Path("AA000001")) # Even if there are multiple files in the list, we delete all but the latest wal_file_deleter.os_unlink_mock.reset_mock() - wal_file_deleter.to_be_deleted_files["a"].add("AA000004") - wal_file_deleter.to_be_deleted_files["a"].add("AA000003") + wal_file_deleter.to_be_deleted_files["a"].add(Path("AA000004")) + wal_file_deleter.to_be_deleted_files["a"].add(Path("AA000003")) wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000001")) time.sleep(TEST_WAIT_TIME) assert len(wal_file_deleter.to_be_deleted_files["a"]) == 1 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000004"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000004")} assert wal_file_deleter.os_unlink_mock.call_count == 3 def test_survive_problems(wal_file_deleter: WALFileDeleterThreadPatched): - - # We survive a non-existing local_path attribute - wal_file_deleter.wal_file_deletion_queue.put({ - "type": "delete_file", - "site": "a", - "local_path_MISSING": "path", - }) - time.sleep(TEST_WAIT_TIME) - assert len(wal_file_deleter.to_be_deleted_files) == 0 - assert wal_file_deleter.running - - # we have to have a type - wal_file_deleter.wal_file_deletion_queue.put({ - "type_MISSING": "delete_file", - "site": "a", - "local_path": "AA000001", - }) - time.sleep(TEST_WAIT_TIME) - assert wal_file_deleter.running - assert len(wal_file_deleter.to_be_deleted_files) == 0 - - # the type does matter - wal_file_deleter.wal_file_deletion_queue.put({ - "type": "DOES MATTER", - "site": "a", - "local_path": "AA000001", - }) - time.sleep(TEST_WAIT_TIME) - assert wal_file_deleter.running - assert len(wal_file_deleter.to_be_deleted_files) == 0 - - # site must not be missing - wal_file_deleter.wal_file_deletion_queue.put({ - "type": "delete_file", - "site_MISSING": "a", - "local_path": "AA000001", - }) - time.sleep(TEST_WAIT_TIME) - assert wal_file_deleter.running - assert len(wal_file_deleter.to_be_deleted_files) == 0 - # Adding the same path twice will still result in that file not deleted wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000001")) wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000001")) time.sleep(TEST_WAIT_TIME) - assert wal_file_deleter.running + assert wal_file_deleter.is_alive() wal_file_deleter.os_unlink_mock.assert_not_called() assert len(wal_file_deleter.to_be_deleted_files["a"]) == 1 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000001"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000001")} # we survive not finding the file during deletion and the to be deleted ("older") file is still removed from the queue wal_file_deleter.os_unlink_mock.side_effect = FileNotFoundError("foo") wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000002")) time.sleep(TEST_WAIT_TIME) - assert wal_file_deleter.running + assert wal_file_deleter.is_alive() assert len(wal_file_deleter.to_be_deleted_files["a"]) == 1 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000002"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000002")} def test_multiple_sites(wal_file_deleter: WALFileDeleterThreadPatched): @@ -147,8 +104,8 @@ def test_multiple_sites(wal_file_deleter: WALFileDeleterThreadPatched): assert wal_file_deleter.running wal_file_deleter.os_unlink_mock.assert_not_called() assert len(wal_file_deleter.to_be_deleted_files) == 2 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000001"} - assert wal_file_deleter.to_be_deleted_files["b"] == {"AA000001"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000001")} + assert wal_file_deleter.to_be_deleted_files["b"] == {Path("AA000001")} # advance one site wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000002", site="a")) @@ -156,8 +113,8 @@ def test_multiple_sites(wal_file_deleter: WALFileDeleterThreadPatched): assert wal_file_deleter.running assert wal_file_deleter.os_unlink_mock.call_count == 1 assert len(wal_file_deleter.to_be_deleted_files) == 2 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000002"} - assert wal_file_deleter.to_be_deleted_files["b"] == {"AA000001"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000002")} + assert wal_file_deleter.to_be_deleted_files["b"] == {Path("AA000001")} # Should do nothing wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000001", site="b")) @@ -165,8 +122,8 @@ def test_multiple_sites(wal_file_deleter: WALFileDeleterThreadPatched): assert wal_file_deleter.running assert wal_file_deleter.os_unlink_mock.call_count == 1 assert len(wal_file_deleter.to_be_deleted_files) == 2 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000002"} - assert wal_file_deleter.to_be_deleted_files["b"] == {"AA000001"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000002")} + assert wal_file_deleter.to_be_deleted_files["b"] == {Path("AA000001")} # now advance it on site b wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000003", site="b")) @@ -174,13 +131,13 @@ def test_multiple_sites(wal_file_deleter: WALFileDeleterThreadPatched): assert wal_file_deleter.running #assert wal_file_deleter.os_unlink_mock.call_count == 2 assert len(wal_file_deleter.to_be_deleted_files) == 2 - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000002"} - assert wal_file_deleter.to_be_deleted_files["b"] == {"AA000003"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000002")} + assert wal_file_deleter.to_be_deleted_files["b"] == {Path("AA000003")} wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000001", site="c")) wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000002", site="c")) wal_file_deleter.wal_file_deletion_queue.put(make_event("AA000003", site="c")) time.sleep(TEST_WAIT_TIME) - assert wal_file_deleter.to_be_deleted_files["a"] == {"AA000002"} - assert wal_file_deleter.to_be_deleted_files["b"] == {"AA000003"} - assert wal_file_deleter.to_be_deleted_files["c"] == {"AA000003"} + assert wal_file_deleter.to_be_deleted_files["a"] == {Path("AA000002")} + assert wal_file_deleter.to_be_deleted_files["b"] == {Path("AA000003")} + assert wal_file_deleter.to_be_deleted_files["c"] == {Path("AA000003")} diff --git a/test/test_webserver.py b/test/test_webserver.py index 36f5c3d3..4451ebbd 100644 --- a/test/test_webserver.py +++ b/test/test_webserver.py @@ -123,7 +123,7 @@ def _run_and_wait_basebackup(self, pghoard, db, mode): pghoard.config["backup_sites"][pghoard.test_site]["basebackup_mode"] = mode pghoard.create_basebackup(pghoard.test_site, db.user, basebackup_path, q) result = q.get(timeout=60) - assert result["success"] + assert result.success backups_after = set(f for f in os.listdir(backup_dir) if not f.endswith(".metadata")) new_backups = backups_after - backups_before assert len(new_backups) == 1