diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 50c2be4c..00b85a5c 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -30,7 +30,7 @@ from pghoard.basebackup.delta import DeltaBaseBackup from pghoard.common import ( TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData, - FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, + FileType, NoException, PersistedProgress, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) @@ -564,6 +564,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = db_conn.commit() self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base) + progress_instance = PersistedProgress() + progress_instance.reset_all(metrics=self.metrics) start_time = time.monotonic() if delta: diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index d2dc36ae..5064d140 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -18,7 +18,10 @@ from rohmu import BaseTransfer, rohmufile from rohmu.dates import now -from rohmu.delta.common import (BackupManifest, BackupPath, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult) +from rohmu.delta.common import ( + BackupManifest, BackupPath, ProgressMetrics, ProgressStep, SnapshotFile, SnapshotHash, SnapshotResult, + SnapshotUploadResult +) from rohmu.delta.snapshot import Snapshotter from rohmu.errors import FileNotFoundFromStorageError from rohmu.typing import HasRead, HasSeek @@ -26,7 +29,7 @@ from pghoard.basebackup.chunks import ChunkUploader from pghoard.common import ( BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, FileType, - FileTypePrefixes, download_backup_meta_file, extract_pghoard_delta_metadata + FileTypePrefixes, PersistedProgress, download_backup_meta_file, extract_pghoard_delta_metadata ) from pghoard.metrics import Metrics from pghoard.transfer import TransferQueue, UploadEvent @@ -45,6 +48,8 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol): FilesChunk = Set[Tuple] SnapshotFiles = Dict[str, SnapshotFile] +PROGRESS_CHECK_INTERVAL = 10 +STALLED_PROGRESS_THRESHOLD = 600 EMPTY_FILE_HASH = hashlib.blake2s().hexdigest() @@ -73,9 +78,33 @@ def __init__( self.tracked_snapshot_files: SnapshotFiles = self._list_existing_files() self.chunk_uploader = chunk_uploader self.data_file_format = data_file_format + self.last_flush_time: float = 0 def _snapshot(self, snapshotter: Snapshotter) -> SnapshotResult: - snapshotter.snapshot(reuse_old_snapshotfiles=False) + def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetrics): + key = "snapshot_progress" + elapsed: float = time.monotonic() - self.last_flush_time + if elapsed > PROGRESS_CHECK_INTERVAL: + persisted_progress = PersistedProgress.read(self.metrics) + progress_info = persisted_progress.get(key) + tags: dict = {"phase": progress_step.value} + + if progress_data["handled"] > progress_info.current_progress: + progress_info.update(progress_data["handled"]) + persisted_progress.write(self.metrics) + self.last_flush_time = time.monotonic() + self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags) + else: + stalled_age = progress_info.age + self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags) + + if stalled_age >= STALLED_PROGRESS_THRESHOLD: + self.log.warning( + "Snapshot progress for %s has been stalled for %s seconds.", progress_step, stalled_age + ) + + self.last_flush_time = time.monotonic() + snapshotter.snapshot(reuse_old_snapshotfiles=False, progress_callback=progress_callback) snapshot_result = SnapshotResult(end=None, state=None, hashes=None) snapshot_result.state = snapshotter.get_snapshot_state() snapshot_result.hashes = [ diff --git a/pghoard/common.py b/pghoard/common.py index ee444b8c..6f93b7cb 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -15,6 +15,7 @@ import re import tarfile import tempfile +import threading import time from contextlib import suppress from dataclasses import dataclass, field @@ -24,13 +25,16 @@ from threading import Thread from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast) +from pydantic import BaseModel, Field from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile from rohmu.errors import Error, InvalidConfigurationError from rohmu.typing import FileLike, HasName from pghoard import pgutil +from pghoard.metrics import Metrics TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json" +PROGRESS_FILE: Final[str] = "persisted_progress_file.json" LOG = logging.getLogger("pghoard.common") @@ -100,6 +104,71 @@ class BaseBackupMode(StrEnum): pipe = "pipe" +class ProgressData(BaseModel): + current_progress: float = 0 + last_updated_time: float = 0 + + @property + def age(self) -> float: + return time.time() - self.last_updated_time + + def update(self, current_progress: float) -> None: + self.current_progress = current_progress + self.last_updated_time = time.time() + + +def atomic_write(file_path: str, data: str, temp_dir: Optional[str] = None): + temp_dir = temp_dir or os.path.dirname(file_path) + temp_file = None + try: + with tempfile.NamedTemporaryFile("w", delete=False, dir=temp_dir) as temp_file: + temp_file.write(data) + temp_path = temp_file.name + os.rename(temp_path, file_path) + except Exception as ex: # pylint: disable=broad-except + LOG.exception("Failed to write file atomically: %r", ex) + if temp_file: + with suppress(FileNotFoundError): + os.unlink(temp_file.name) + + +class PersistedProgress(BaseModel): + progress: Dict[str, ProgressData] = Field(default_factory=dict) + _lock: threading.Lock = threading.Lock() + + @classmethod + def read(cls, metrics: Metrics) -> "PersistedProgress": + if os.path.exists(PROGRESS_FILE): + with open(PROGRESS_FILE, "r") as file: + try: + return cls.parse_raw(file.read()) + except Exception as ex: # pylint: disable=broad-except + LOG.exception("Failed to read persisted progress file: %r", ex) + metrics.unexpected_exception(ex, where="read_persisted_progress") + return cls() + + def write(self, metrics: Metrics): + with self._lock: + try: + data = self.json() + atomic_write(PROGRESS_FILE, data) + except Exception as ex: # pylint: disable=broad-except + metrics.unexpected_exception(ex, where="write_persisted_progress") + + def get(self, key: str) -> ProgressData: + self.progress.setdefault(key, ProgressData()) + return self.progress[key] + + def reset(self, key: str, metrics: Metrics) -> None: + if key in self.progress: + del self.progress[key] + self.write(metrics=metrics) + + def reset_all(self, metrics: Metrics) -> None: + self.progress = {} + self.write(metrics=metrics) + + def create_pgpass_file(connection_string_or_info): """Look up password from the given object which can be a dict or a string and write a possible password in a pgpass file; diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 1af3daf3..bf2f2b27 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -25,7 +25,7 @@ from rohmu.typing import Metadata from pghoard.common import ( - CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, + CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, get_object_storage_config ) from pghoard.fetcher import FileFetchManager @@ -147,14 +147,33 @@ def untrack_upload_event(self, file_key: str) -> None: def increment(self, file_key: str, total_bytes_uploaded: float) -> None: metric_data = {} + persisted_progress = PersistedProgress.read(metrics=self.metrics) + with self._tracked_events_lock: if file_key not in self._tracked_events: raise Exception(f"UploadEvent for {file_key} is not being tracked.") file_type = self._tracked_events[file_key].file_type if file_type in ( - FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + FileType.Basebackup, + FileType.Basebackup_chunk, + FileType.Basebackup_delta, + FileType.Basebackup_delta_chunk, ): + progress_info = persisted_progress.get(file_key) + if total_bytes_uploaded > progress_info.current_progress: + progress_info.update(total_bytes_uploaded) + persisted_progress.write(metrics=self.metrics) + self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0) + else: + stalled_age = progress_info.age + self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age) + if stalled_age >= 600: + self.log.warning( + "Upload for file %s has been stalled for %s seconds.", + file_key, + stalled_age, + ) metric_data = { "metric": "pghoard.basebackup_bytes_uploaded", "inc_value": total_bytes_uploaded, @@ -410,6 +429,13 @@ def run_safe(self): time.monotonic() - start_time ) + if file_to_transfer.operation in {TransferOperation.Upload} and filetype in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + if result.success: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) + self.fetch_manager.stop() self.log.debug("Quitting TransferAgent") @@ -513,6 +539,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): # Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20)) - + if file_to_transfer.file_type in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ) and file_to_transfer.retry_number < 2: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) self.transfer_queue.put(file_to_transfer) return None diff --git a/test/basebackup/test_delta.py b/test/basebackup/test_delta.py index bac78d28..e1a6d217 100644 --- a/test/basebackup/test_delta.py +++ b/test/basebackup/test_delta.py @@ -442,22 +442,39 @@ def test_upload_single_delta_files_progress( delta_hashes = {file_hash for _, file_hash in delta_files} with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest, \ - patch.object(deltabasebackup, "metrics") as mock_metrics, \ + patch.object(deltabasebackup, "metrics") as mock_metrics, \ patch.object(snapshotter, "update_snapshot_file_data"): mock_delta_upload_hexdigest.side_effect = [(200, 10, file_hash, True) for file_hash in delta_hashes] with snapshotter.lock: - deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access - deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access - todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress - ) - expected_calls = [ - mock.call( - "pghoard.basebackup_estimated_progress", - initial_progress + (idx + 1) * (100 - initial_progress) / files_count, - tags={"site": "delta"} - ) for idx in range(files_count) - ] - assert mock_metrics.gauge.mock_calls == expected_calls + with patch("pghoard.basebackup.delta.PROGRESS_CHECK_INTERVAL", new=1): + deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access + deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access + todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress + ) + expected_calls = [ + mock.call( + "pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "creating_missing_directories"} + ), + mock.call("pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "adding_missing_files"}), + ] + + expected_calls += [ + mock.call( + "pghoard.seconds_since_backup_progress_stalled", + 0, + tags={"phase": "processing_and_hashing_snapshot_files"} + ) for _ in range(files_count) + ] + + expected_calls = [ + mock.call( + "pghoard.basebackup_estimated_progress", + initial_progress + (idx + 1) * (100 - initial_progress) / files_count, + tags={"site": "delta"} + ) for idx in range(files_count) + ] + + assert mock_metrics.gauge.mock_calls == expected_calls def test_upload_single_delta_files( diff --git a/test/test_common.py b/test/test_common.py index d575f79b..7aaa42e5 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -14,8 +14,9 @@ from mock.mock import Mock from rohmu.errors import Error +from pghoard import metrics from pghoard.common import ( - TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file, + TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file, extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, pg_major_version, pg_version_string_to_number, write_json_file ) @@ -88,6 +89,55 @@ def test_json_serialization(self, tmpdir): assert ob2 == ob2_ + def test_persisted_progress(self, mocker, tmp_path): + test_progress_file = tmp_path / "test_progress.json" + original_time = 1625072042.123456 + test_data = { + "progress": { + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": { + "current_progress": 100, + "last_updated_time": original_time + } + } + } + + with open(test_progress_file, "w") as file: + json.dump(test_data, file) + + mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file) + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == 100 + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time == 1625072042.123456 + + new_progress = 200 + progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b") + progress_info.update(new_progress) + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == new_progress + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time > original_time + + def test_default_persisted_progress_creation(self, mocker, tmp_path): + tmp_file = tmp_path / "non_existent_progress.json" + assert not tmp_file.exists() + + mocker.patch("pghoard.common.PROGRESS_FILE", str(tmp_file)) + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + + assert persisted_progress.progress == {} + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + assert tmp_file.exists() + with open(tmp_file, "r") as file: + data = json.load(file) + assert data == {"progress": {}} + def test_pg_major_version(): assert pg_major_version("10") == "10" diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 703383fb..31475a96 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -15,7 +15,7 @@ from rohmu.errors import FileNotFoundFromStorageError, StorageError from pghoard import metrics -from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent +from pghoard.common import (CallbackEvent, CallbackQueue, FileType, PersistedProgress, QuitEvent) from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker) # pylint: disable=attribute-defined-outside-init @@ -316,3 +316,27 @@ def test_handle_metadata_error(self): evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar") assert evt.success is False assert isinstance(evt.exception, Exception) + + def test_handle_upload_with_persisted_progress(self, mocker, tmp_path): + + temp_progress_file = tmp_path / "test_progress.json" + assert not temp_progress_file.exists() + + mocker.patch("pghoard.common.PROGRESS_FILE", temp_progress_file) + upload_event = UploadEvent( + backup_site_name="test_site", + file_type=FileType.Basebackup, + file_path=Path(self.foo_basebackup_path), + source_data=Path(self.foo_basebackup_path), + metadata={}, + file_size=3, + callback_queue=CallbackQueue(), + remove_after_upload=True + ) + + self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event) + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert temp_progress_file.exists() + assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3 + if temp_progress_file.exists(): + temp_progress_file.unlink()