From 5a186f5d6229143ace9c037754e7ad3fc596389d Mon Sep 17 00:00:00 2001 From: Sebin Sunny Date: Tue, 6 Feb 2024 13:15:45 +1100 Subject: [PATCH] This PR improves monitoring of pg basebackups. During a backup, it regularly checks how much data has been uploaded and compares this to the last recorded amount in a persisted progress file. If the upload is progressing, it updates the record with the new data and current time. If the backup has not advanced compared to the previous value, it reports the time elapsed since the last progress and emits stalled metrics. Once a backup is complete, the record is reset. [SRE-7631] --- pghoard/basebackup/base.py | 4 +- pghoard/basebackup/delta.py | 30 +++++++++++++-- pghoard/common.py | 69 +++++++++++++++++++++++++++++++++++ pghoard/transfer.py | 36 ++++++++++++++++-- test/basebackup/test_delta.py | 16 ++++++++ test/test_common.py | 52 +++++++++++++++++++++++++- test/test_transferagent.py | 26 ++++++++++++- 7 files changed, 224 insertions(+), 9 deletions(-) diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 50c2be4c..00b85a5c 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -30,7 +30,7 @@ from pghoard.basebackup.delta import DeltaBaseBackup from pghoard.common import ( TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData, - FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, + FileType, NoException, PersistedProgress, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) @@ -564,6 +564,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = db_conn.commit() self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base) + progress_instance = PersistedProgress() + progress_instance.reset_all(metrics=self.metrics) start_time = time.monotonic() if delta: diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index d2dc36ae..abf133ef 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -18,7 +18,10 @@ from rohmu import BaseTransfer, rohmufile from rohmu.dates import now -from rohmu.delta.common import (BackupManifest, BackupPath, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult) +from rohmu.delta.common import ( + BackupManifest, BackupPath, ProgressMetrics, ProgressStep, SnapshotFile, SnapshotHash, SnapshotResult, + SnapshotUploadResult +) from rohmu.delta.snapshot import Snapshotter from rohmu.errors import FileNotFoundFromStorageError from rohmu.typing import HasRead, HasSeek @@ -26,7 +29,7 @@ from pghoard.basebackup.chunks import ChunkUploader from pghoard.common import ( BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, FileType, - FileTypePrefixes, download_backup_meta_file, extract_pghoard_delta_metadata + FileTypePrefixes, PersistedProgress, download_backup_meta_file, extract_pghoard_delta_metadata ) from pghoard.metrics import Metrics from pghoard.transfer import TransferQueue, UploadEvent @@ -73,9 +76,30 @@ def __init__( self.tracked_snapshot_files: SnapshotFiles = self._list_existing_files() self.chunk_uploader = chunk_uploader self.data_file_format = data_file_format + self.last_flush_time: float = 0 def _snapshot(self, snapshotter: Snapshotter) -> SnapshotResult: - snapshotter.snapshot(reuse_old_snapshotfiles=False) + def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetrics): + key = "snapshot_progress" + persisted_progress = PersistedProgress.read(self.metrics) + progress_info = persisted_progress.get(key) + tags: dict = {"phase": progress_step.value} + if progress_data["handled"] > progress_info.current_progress: + progress_info.update(progress_data["handled"]) + elapsed: float = time.monotonic() - self.last_flush_time + if elapsed > 10: + persisted_progress.write(self.metrics) + self.last_flush_time = time.monotonic() + self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags) + else: + stalled_age = progress_info.age + self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags) + + if stalled_age >= 600: + self.log.warning("Snapshot progress for %s has been stalled for %s seconds.", progress_step, stalled_age) + + self.last_flush_time = time.monotonic() + snapshotter.snapshot(reuse_old_snapshotfiles=False, progress_callback=progress_callback) snapshot_result = SnapshotResult(end=None, state=None, hashes=None) snapshot_result.state = snapshotter.get_snapshot_state() snapshot_result.hashes = [ diff --git a/pghoard/common.py b/pghoard/common.py index ee444b8c..6f93b7cb 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -15,6 +15,7 @@ import re import tarfile import tempfile +import threading import time from contextlib import suppress from dataclasses import dataclass, field @@ -24,13 +25,16 @@ from threading import Thread from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast) +from pydantic import BaseModel, Field from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile from rohmu.errors import Error, InvalidConfigurationError from rohmu.typing import FileLike, HasName from pghoard import pgutil +from pghoard.metrics import Metrics TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json" +PROGRESS_FILE: Final[str] = "persisted_progress_file.json" LOG = logging.getLogger("pghoard.common") @@ -100,6 +104,71 @@ class BaseBackupMode(StrEnum): pipe = "pipe" +class ProgressData(BaseModel): + current_progress: float = 0 + last_updated_time: float = 0 + + @property + def age(self) -> float: + return time.time() - self.last_updated_time + + def update(self, current_progress: float) -> None: + self.current_progress = current_progress + self.last_updated_time = time.time() + + +def atomic_write(file_path: str, data: str, temp_dir: Optional[str] = None): + temp_dir = temp_dir or os.path.dirname(file_path) + temp_file = None + try: + with tempfile.NamedTemporaryFile("w", delete=False, dir=temp_dir) as temp_file: + temp_file.write(data) + temp_path = temp_file.name + os.rename(temp_path, file_path) + except Exception as ex: # pylint: disable=broad-except + LOG.exception("Failed to write file atomically: %r", ex) + if temp_file: + with suppress(FileNotFoundError): + os.unlink(temp_file.name) + + +class PersistedProgress(BaseModel): + progress: Dict[str, ProgressData] = Field(default_factory=dict) + _lock: threading.Lock = threading.Lock() + + @classmethod + def read(cls, metrics: Metrics) -> "PersistedProgress": + if os.path.exists(PROGRESS_FILE): + with open(PROGRESS_FILE, "r") as file: + try: + return cls.parse_raw(file.read()) + except Exception as ex: # pylint: disable=broad-except + LOG.exception("Failed to read persisted progress file: %r", ex) + metrics.unexpected_exception(ex, where="read_persisted_progress") + return cls() + + def write(self, metrics: Metrics): + with self._lock: + try: + data = self.json() + atomic_write(PROGRESS_FILE, data) + except Exception as ex: # pylint: disable=broad-except + metrics.unexpected_exception(ex, where="write_persisted_progress") + + def get(self, key: str) -> ProgressData: + self.progress.setdefault(key, ProgressData()) + return self.progress[key] + + def reset(self, key: str, metrics: Metrics) -> None: + if key in self.progress: + del self.progress[key] + self.write(metrics=metrics) + + def reset_all(self, metrics: Metrics) -> None: + self.progress = {} + self.write(metrics=metrics) + + def create_pgpass_file(connection_string_or_info): """Look up password from the given object which can be a dict or a string and write a possible password in a pgpass file; diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 1af3daf3..bf2f2b27 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -25,7 +25,7 @@ from rohmu.typing import Metadata from pghoard.common import ( - CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, + CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, get_object_storage_config ) from pghoard.fetcher import FileFetchManager @@ -147,14 +147,33 @@ def untrack_upload_event(self, file_key: str) -> None: def increment(self, file_key: str, total_bytes_uploaded: float) -> None: metric_data = {} + persisted_progress = PersistedProgress.read(metrics=self.metrics) + with self._tracked_events_lock: if file_key not in self._tracked_events: raise Exception(f"UploadEvent for {file_key} is not being tracked.") file_type = self._tracked_events[file_key].file_type if file_type in ( - FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + FileType.Basebackup, + FileType.Basebackup_chunk, + FileType.Basebackup_delta, + FileType.Basebackup_delta_chunk, ): + progress_info = persisted_progress.get(file_key) + if total_bytes_uploaded > progress_info.current_progress: + progress_info.update(total_bytes_uploaded) + persisted_progress.write(metrics=self.metrics) + self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0) + else: + stalled_age = progress_info.age + self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age) + if stalled_age >= 600: + self.log.warning( + "Upload for file %s has been stalled for %s seconds.", + file_key, + stalled_age, + ) metric_data = { "metric": "pghoard.basebackup_bytes_uploaded", "inc_value": total_bytes_uploaded, @@ -410,6 +429,13 @@ def run_safe(self): time.monotonic() - start_time ) + if file_to_transfer.operation in {TransferOperation.Upload} and filetype in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + if result.success: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) + self.fetch_manager.stop() self.log.debug("Quitting TransferAgent") @@ -513,6 +539,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): # Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20)) - + if file_to_transfer.file_type in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ) and file_to_transfer.retry_number < 2: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) self.transfer_queue.put(file_to_transfer) return None diff --git a/test/basebackup/test_delta.py b/test/basebackup/test_delta.py index bac78d28..6ba01c1a 100644 --- a/test/basebackup/test_delta.py +++ b/test/basebackup/test_delta.py @@ -451,12 +451,28 @@ def test_upload_single_delta_files_progress( todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress ) expected_calls = [ + mock.call( + "pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "creating_missing_directories"} + ), + mock.call("pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "adding_missing_files"}), + ] + + expected_calls += [ + mock.call( + "pghoard.seconds_since_backup_progress_stalled", + 0, + tags={"phase": "processing_and_hashing_snapshot_files"} + ) for _ in range(files_count) + ] + + expected_calls += [ mock.call( "pghoard.basebackup_estimated_progress", initial_progress + (idx + 1) * (100 - initial_progress) / files_count, tags={"site": "delta"} ) for idx in range(files_count) ] + assert mock_metrics.gauge.mock_calls == expected_calls diff --git a/test/test_common.py b/test/test_common.py index d575f79b..7aaa42e5 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -14,8 +14,9 @@ from mock.mock import Mock from rohmu.errors import Error +from pghoard import metrics from pghoard.common import ( - TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file, + TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file, extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, pg_major_version, pg_version_string_to_number, write_json_file ) @@ -88,6 +89,55 @@ def test_json_serialization(self, tmpdir): assert ob2 == ob2_ + def test_persisted_progress(self, mocker, tmp_path): + test_progress_file = tmp_path / "test_progress.json" + original_time = 1625072042.123456 + test_data = { + "progress": { + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": { + "current_progress": 100, + "last_updated_time": original_time + } + } + } + + with open(test_progress_file, "w") as file: + json.dump(test_data, file) + + mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file) + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == 100 + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time == 1625072042.123456 + + new_progress = 200 + progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b") + progress_info.update(new_progress) + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == new_progress + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time > original_time + + def test_default_persisted_progress_creation(self, mocker, tmp_path): + tmp_file = tmp_path / "non_existent_progress.json" + assert not tmp_file.exists() + + mocker.patch("pghoard.common.PROGRESS_FILE", str(tmp_file)) + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + + assert persisted_progress.progress == {} + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + assert tmp_file.exists() + with open(tmp_file, "r") as file: + data = json.load(file) + assert data == {"progress": {}} + def test_pg_major_version(): assert pg_major_version("10") == "10" diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 703383fb..31475a96 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -15,7 +15,7 @@ from rohmu.errors import FileNotFoundFromStorageError, StorageError from pghoard import metrics -from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent +from pghoard.common import (CallbackEvent, CallbackQueue, FileType, PersistedProgress, QuitEvent) from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker) # pylint: disable=attribute-defined-outside-init @@ -316,3 +316,27 @@ def test_handle_metadata_error(self): evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar") assert evt.success is False assert isinstance(evt.exception, Exception) + + def test_handle_upload_with_persisted_progress(self, mocker, tmp_path): + + temp_progress_file = tmp_path / "test_progress.json" + assert not temp_progress_file.exists() + + mocker.patch("pghoard.common.PROGRESS_FILE", temp_progress_file) + upload_event = UploadEvent( + backup_site_name="test_site", + file_type=FileType.Basebackup, + file_path=Path(self.foo_basebackup_path), + source_data=Path(self.foo_basebackup_path), + metadata={}, + file_size=3, + callback_queue=CallbackQueue(), + remove_after_upload=True + ) + + self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event) + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert temp_progress_file.exists() + assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3 + if temp_progress_file.exists(): + temp_progress_file.unlink()