diff --git a/Makefile b/Makefile index 2ac0ce5d..68d60d82 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ mypy: version .PHONY: fmt fmt: version unify --quote '"' --recursive --in-place $(PYTHON_SOURCE_DIRS) - isort --recursive $(PYTHON_SOURCE_DIRS) + isort $(PYTHON_SOURCE_DIRS) yapf --parallel --recursive --in-place $(PYTHON_SOURCE_DIRS) .PHONY: coverage diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 76c97cb1..ecc70ec2 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -29,9 +29,9 @@ from pghoard.basebackup.chunks import ChunkUploader, DeltaStats from pghoard.basebackup.delta import DeltaBaseBackup from pghoard.common import ( - BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData, FileType, NoException, - PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, extract_pghoard_bb_v2_metadata, - replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, + TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData, + FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, + extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) from pghoard.compressor import CompressionEvent @@ -397,7 +397,7 @@ def run_basic_basebackup(self): def get_control_entries_for_tar(self, *, metadata, pg_control, backup_label): mtime = time.time() blob = io.BytesIO(common.json_encode(metadata, binary=True)) - ti = tarfile.TarInfo(name=".pghoard_tar_metadata.json") + ti = tarfile.TarInfo(name=TAR_METADATA_FILENAME) ti.size = len(blob.getbuffer()) ti.mtime = mtime yield ti, blob, False diff --git a/pghoard/basebackup/chunks.py b/pghoard/basebackup/chunks.py index 575fd723..1618aba7 100644 --- a/pghoard/basebackup/chunks.py +++ b/pghoard/basebackup/chunks.py @@ -12,15 +12,15 @@ from queue import Empty from tempfile import NamedTemporaryFile from types import TracebackType -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union +from typing import (Any, Callable, Dict, List, Optional, Tuple, Type, Union, cast) from rohmu import rohmufile from rohmu.delta.common import EMBEDDED_FILE_SIZE # pylint: disable=superfluous-parens from pghoard.common import ( - BackupFailure, BaseBackupFormat, CallbackEvent, CallbackQueue, CompressionData, EncryptionData, FileType, - FileTypePrefixes, NoException + BackupFailure, BaseBackupFormat, CallbackEvent, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, + FileType, FileTypePrefixes, NoException ) from pghoard.metrics import Metrics from pghoard.transfer import TransferQueue, UploadEvent @@ -144,22 +144,23 @@ def tar_one_file( start_time = time.monotonic() with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj: + raw_output_file = cast(FileLikeWithName, raw_output_obj) # pylint: disable=bad-continuation with rohmufile.file_writer( compression_algorithm=self.compression_data.algorithm, compression_level=self.compression_data.level, compression_threads=self.site_config["basebackup_compression_threads"], rsa_public_key=self.encryption_data.rsa_public_key, - fileobj=raw_output_obj + fileobj=raw_output_file ) as output_obj: with tarfile.TarFile(fileobj=output_obj, mode="w") as output_tar: self.write_files_to_tar(files=files_to_backup, tar=output_tar, delta_stats=delta_stats) input_size = output_obj.tell() - result_size = raw_output_obj.tell() + result_size = raw_output_file.tell() # Make the file persist over the with-block with this hardlink - os.link(raw_output_obj.name, chunk_path) + os.link(raw_output_file.name, chunk_path) rohmufile.log_compression_result( encrypted=bool(self.encryption_data.encryption_key_id), diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index edb3b353..a5311dcd 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -14,20 +14,19 @@ from pathlib import Path from queue import Empty from tempfile import NamedTemporaryFile -from typing import AbstractSet, Any, Callable, Dict, Iterable, List, Set, Tuple +from typing import (AbstractSet, Any, Callable, Dict, Iterable, List, Protocol, Set, Tuple, cast) from rohmu import BaseTransfer, rohmufile from rohmu.dates import now -from rohmu.delta.common import ( - BackupManifest, BackupPath, SizeLimitedFile, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult -) +from rohmu.delta.common import (BackupManifest, BackupPath, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult) from rohmu.delta.snapshot import Snapshotter from rohmu.errors import FileNotFoundFromStorageError +from rohmu.typing import HasRead, HasSeek from pghoard.basebackup.chunks import ChunkUploader from pghoard.common import ( - BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileType, FileTypePrefixes, - download_backup_meta_file, extract_pghoard_delta_metadata + BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, FileType, + FileTypePrefixes, download_backup_meta_file, extract_pghoard_delta_metadata ) from pghoard.metrics import Metrics from pghoard.transfer import TransferQueue, UploadEvent @@ -40,6 +39,10 @@ class UploadedFilesMetric: count: int = 0 +class HasReadAndSeek(HasRead, HasSeek, Protocol): + ... + + FilesChunk = Set[Tuple] SnapshotFiles = Dict[str, SnapshotFile] @@ -114,7 +117,7 @@ def _list_existing_files(self) -> SnapshotFiles: return all_snapshot_files def _delta_upload_hexdigest( - self, *, temp_dir: Path, chunk_path: Path, file_obj: SizeLimitedFile, callback_queue: CallbackQueue, + self, *, temp_dir: Path, chunk_path: Path, file_obj: HasReadAndSeek, callback_queue: CallbackQueue, relative_path: Path ) -> Tuple[int, int, str, bool]: """Schedule a separate delta file for the upload, calculates the final hash to use it as a name @@ -131,9 +134,10 @@ def progress_callback(n_bytes: int = 1) -> None: self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True}) with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj: + raw_output_file = cast(FileLikeWithName, raw_output_obj) rohmufile.write_file( input_obj=file_obj, - output_obj=raw_output_obj, + output_obj=raw_output_file, compression_algorithm=self.compression_data.algorithm, compression_level=self.compression_data.level, rsa_public_key=self.encryption_data.rsa_public_key, @@ -141,8 +145,8 @@ def progress_callback(n_bytes: int = 1) -> None: data_callback=result_hash.update, progress_callback=progress_callback, ) - result_size = raw_output_obj.tell() - raw_output_obj.seek(0) + result_size = raw_output_file.tell() + raw_output_file.seek(0) result_digest = result_hash.hexdigest() @@ -157,7 +161,7 @@ def progress_callback(n_bytes: int = 1) -> None: else: self.submitted_hashes.add(result_digest) - os.link(raw_output_obj.name, chunk_path) + os.link(raw_output_file.name, chunk_path) rohmufile.log_compression_result( encrypted=bool(self.encryption_data.encryption_key_id), @@ -333,6 +337,9 @@ def _split_files_for_upload( current_chunk_size: int = 0 current_chunk: FilesChunk = set() + if snapshot_result.state is None: + raise BackupFailure("Snapshot result state is None") + for snapshot_file in snapshot_result.state.files: if not snapshot_file.should_be_bundled: if snapshot_file.hexdigest: @@ -378,6 +385,9 @@ def _read_delta_sizes(self, snapshot_result: SnapshotResult) -> Tuple[UploadedFi digests_metric = UploadedFilesMetric() embed_metric = UploadedFilesMetric() + if snapshot_result.state is None: + raise BackupFailure("Snapshot result state is None") + for snapshot_file in snapshot_result.state.files: # Sizes of files uploaded as chunks are calculated separately if snapshot_file.should_be_bundled: diff --git a/pghoard/common.py b/pghoard/common.py index d3874ffb..ee444b8c 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -22,16 +22,23 @@ from pathlib import Path from queue import Queue from threading import Thread -from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Optional, Tuple) +from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast) from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile from rohmu.errors import Error, InvalidConfigurationError +from rohmu.typing import FileLike, HasName from pghoard import pgutil +TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json" + LOG = logging.getLogger("pghoard.common") +class FileLikeWithName(FileLike, HasName, Protocol): + ... + + class StrEnum(str, enum.Enum): def __str__(self): return str(self.value) @@ -306,24 +313,30 @@ def delete_alert_file(config, filename): os.unlink(filepath) -def _extract_metadata(fileobj): +def _extract_metadata(fileobj: BinaryIO) -> Dict[str, Any]: # | in mode to use tarfile's internal stream buffer manager, currently required because our SnappyFile # interface doesn't do proper buffering for reads with tarfile.open(fileobj=fileobj, mode="r|", bufsize=IO_BLOCK_SIZE) as tar: for tarinfo in tar: - if tarinfo.name == ".pghoard_tar_metadata.json": - tar_meta_bytes = tar.extractfile(tarinfo).read() + if tarinfo.name == TAR_METADATA_FILENAME: + tar_extracted = tar.extractfile(tarinfo) + if tar_extracted is None: + raise Exception( + f"{TAR_METADATA_FILENAME} is not a regular file, there is no data associated to it. " + "Is it a directory?" + ) + tar_meta_bytes = tar_extracted.read() return json.loads(tar_meta_bytes.decode("utf-8")) - raise Exception(".pghoard_tar_metadata.json not found") + raise Exception(f"{TAR_METADATA_FILENAME} not found") -def extract_pghoard_bb_v2_metadata(fileobj): - return _extract_metadata(fileobj) +def extract_pghoard_bb_v2_metadata(fileobj: FileLike) -> Dict[str, Any]: + return _extract_metadata(cast(BinaryIO, fileobj)) -def extract_pghoard_delta_metadata(fileobj): - return _extract_metadata(fileobj) +def extract_pghoard_delta_metadata(fileobj: FileLike) -> Dict[str, Any]: + return _extract_metadata(cast(BinaryIO, fileobj)) def get_pg_wal_directory(config): @@ -423,7 +436,7 @@ def from_config(config) -> "CompressionData": def download_backup_meta_file( storage: BaseTransfer, basebackup_path: str, metadata: Dict[str, Any], key_lookup: Callable[[str], str], - extract_meta_func: Callable[[BinaryIO], Dict[str, Any]] + extract_meta_func: Callable[[FileLike], Dict[str, Any]] ) -> Tuple[Dict[str, Any], bytes]: bmeta_compressed = storage.get_contents_to_string(basebackup_path)[0] with rohmufile.file_reader(fileobj=io.BytesIO(bmeta_compressed), metadata=metadata, key_lookup=key_lookup) as input_obj: diff --git a/pghoard/restore.py b/pghoard/restore.py index 9f78e33d..a9b7836f 100644 --- a/pghoard/restore.py +++ b/pghoard/restore.py @@ -34,7 +34,8 @@ from rohmu.errors import (Error, InvalidConfigurationError, MaybeRecoverableError) from pghoard.common import ( - BaseBackupFormat, FileType, FileTypePrefixes, StrEnum, download_backup_meta_file, extract_pghoard_delta_metadata + TAR_METADATA_FILENAME, BaseBackupFormat, FileType, FileTypePrefixes, StrEnum, download_backup_meta_file, + extract_pghoard_delta_metadata ) from pghoard.object_store import (HTTPRestore, ObjectStore, print_basebackup_list) @@ -932,7 +933,7 @@ def _build_tar_args(self, metadata): if not file_format: return base_args elif file_format in {BaseBackupFormat.v1, BaseBackupFormat.v2, BaseBackupFormat.delta_v1, BaseBackupFormat.delta_v2}: - extra_args = ["--exclude", ".pghoard_tar_metadata.json", "--transform", "s,^pgdata/,,"] + extra_args = ["--exclude", TAR_METADATA_FILENAME, "--transform", "s,^pgdata/,,"] if file_format in {BaseBackupFormat.delta_v1, BaseBackupFormat.delta_v2}: extra_args += ["--exclude", ".manifest.json"] if self.tablespaces: diff --git a/pghoard/transfer.py b/pghoard/transfer.py index c1510480..7ed73f8f 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -20,6 +20,7 @@ from rohmu import get_transfer from rohmu.errors import FileNotFoundFromStorageError from rohmu.object_storage.base import (BaseTransfer, IncrementalProgressCallbackType) +from rohmu.typing import Metadata from pghoard.common import ( CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, @@ -50,7 +51,7 @@ class BaseTransferEvent: @dataclass(frozen=True) class UploadEvent(BaseTransferEvent): source_data: Union[BinaryIO, Path] - metadata: Dict[str, str] + metadata: Metadata file_size: Optional[int] remove_after_upload: bool = True retry_number: int = 0 diff --git a/test/basebackup/test_delta.py b/test/basebackup/test_delta.py index 4f4f5557..bac78d28 100644 --- a/test/basebackup/test_delta.py +++ b/test/basebackup/test_delta.py @@ -4,7 +4,7 @@ from pathlib import Path from queue import Empty from test.base import CONSTANT_TEST_RSA_PUBLIC_KEY -from typing import Any, Callable, Dict, Generator, Tuple +from typing import Any, Callable, Dict, Generator, Tuple, cast from unittest.mock import MagicMock, Mock import mock @@ -419,18 +419,18 @@ def test_upload_single_delta_files_cleanup_after_error( ) -> None: _, file_hash = delta_file - with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest: + with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest, \ + patch.object(snapshotter, "update_snapshot_file_data", side_effect=Exception): mock_delta_upload_hexdigest.return_value = (200, 10, file_hash, True) - snapshotter.update_snapshot_file_data = Mock(side_effect=Exception) if not key_exists: - deltabasebackup.storage.delete_key.side_effect = FileNotFoundFromStorageError + cast(Mock, deltabasebackup.storage.delete_key).side_effect = FileNotFoundFromStorageError with snapshotter.lock: deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access with pytest.raises(BackupFailure): deltabasebackup._upload_single_delta_files(todo_hexdigests={file_hash}, snapshotter=snapshotter, progress=0) # pylint: disable=protected-access - deltabasebackup.storage.delete_key.assert_called_with(f"abc/basebackup_delta/{file_hash}") + cast(Mock, deltabasebackup.storage.delete_key).assert_called_with(f"abc/basebackup_delta/{file_hash}") @pytest.mark.parametrize("files_count, initial_progress", [(1, 0), (4, 0), (10, 0), (1, 90), (15, 10)]) @@ -442,9 +442,9 @@ def test_upload_single_delta_files_progress( delta_hashes = {file_hash for _, file_hash in delta_files} with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest, \ - patch.object(deltabasebackup, "metrics") as mock_metrics: + patch.object(deltabasebackup, "metrics") as mock_metrics, \ + patch.object(snapshotter, "update_snapshot_file_data"): mock_delta_upload_hexdigest.side_effect = [(200, 10, file_hash, True) for file_hash in delta_hashes] - snapshotter.update_snapshot_file_data = Mock() with snapshotter.lock: deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access diff --git a/test/test_common.py b/test/test_common.py index 7c8067c9..d575f79b 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -15,9 +15,9 @@ from rohmu.errors import Error from pghoard.common import ( - create_pgpass_file, default_json_serialization, download_backup_meta_file, extract_pg_command_version_string, - extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, pg_major_version, - pg_version_string_to_number, write_json_file + TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file, + extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, + pg_major_version, pg_version_string_to_number, write_json_file ) from .base import PGHoardTestCase @@ -221,7 +221,7 @@ def convert_pg_command_version_to_number(command_version_string): ] ) def test_download_backup_meta(metadata, extract_meta_func): - data = dict_to_tar_data(data=metadata, tar_name=".pghoard_tar_metadata.json") + data = dict_to_tar_data(data=metadata, tar_name=TAR_METADATA_FILENAME) storage = Mock() storage.get_contents_to_string.return_value = (data, {}) backup_meta, backup_compressed_data = download_backup_meta_file( diff --git a/test/test_pghoard.py b/test/test_pghoard.py index a968e8d4..0d364f5c 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -16,7 +16,9 @@ import pytest import pghoard.pghoard as pghoard_module -from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file) +from pghoard.common import ( + TAR_METADATA_FILENAME, BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file +) from pghoard.pghoard import PGHoard from pghoard.pgutil import create_connection_string @@ -466,7 +468,7 @@ def write_backup_files(what): } } } - input_size = dict_to_tar_file(data=metadata, file_path=bb_path, tar_name=".pghoard_tar_metadata.json") + input_size = dict_to_tar_file(data=metadata, file_path=bb_path, tar_name=TAR_METADATA_FILENAME) for h in hexdigests: with open(Path(basebackup_delta_path) / h, "w") as digest_file, \ diff --git a/test/test_restore.py b/test/test_restore.py index aa283dea..f5f18648 100644 --- a/test/test_restore.py +++ b/test/test_restore.py @@ -19,7 +19,7 @@ import pytest -from pghoard.common import write_json_file +from pghoard.common import TAR_METADATA_FILENAME, write_json_file from pghoard.restore import ( MAX_RETRIES, BasebackupFetcher, ChunkFetcher, FileDataInfo, FileInfoType, FilePathInfo, Restore, RestoreError, create_recovery_conf @@ -541,7 +541,7 @@ def test_restore_get_delta_basebackup_data(): } } - data = dict_to_tar_data(metadata, tar_name=".pghoard_tar_metadata.json") + data = dict_to_tar_data(metadata, tar_name=TAR_METADATA_FILENAME) r = Restore() r.config = { diff --git a/test/util.py b/test/util.py index 5cace0de..28d29ba9 100644 --- a/test/util.py +++ b/test/util.py @@ -2,14 +2,20 @@ import tarfile import time from pathlib import Path -from typing import Any, BinaryIO, Dict, Union +from typing import TYPE_CHECKING, Any, BinaryIO, Dict, Optional, Union, cast from rohmu import rohmufile +from rohmu.typing import FileLike from pghoard.common import json_encode from .conftest import PGHoardForTest +if TYPE_CHECKING: + from tarfile import _Fileobj # pylint: disable=no-name-in-module +else: + _Fileobj = Any + def wait_for_xlog(pghoard: PGHoardForTest, count: int): start = time.monotonic() @@ -56,8 +62,10 @@ def dict_to_file_obj(fileobj: BinaryIO, data: Dict[str, Any], tar_name: str) -> ti.size = len(blob.getbuffer()) ti.mtime = int(time.time()) - with rohmufile.file_writer(compression_algorithm="snappy", compression_level=0, fileobj=fileobj) as output_obj: - with tarfile.TarFile(fileobj=output_obj, mode="w") as output_tar: + with rohmufile.file_writer( + compression_algorithm="snappy", compression_level=0, fileobj=cast(FileLike, fileobj) + ) as output_obj: + with tarfile.TarFile(fileobj=cast(Optional[_Fileobj], output_obj), mode="w") as output_tar: output_tar.addfile(ti, blob) return output_obj.tell()