From bd1d7d661b1f61038d00d4e77dcfc54294628533 Mon Sep 17 00:00:00 2001 From: Sebin Sunny Date: Tue, 6 Feb 2024 13:15:45 +1100 Subject: [PATCH] This PR improves monitoring of pg basebackups. During a backup, it regularly checks how much data has been uploaded and compares this to the last recorded amount in a persisted progress file. If the upload is progressing, it updates the record with the new data and current time. If the backup has not advanced compared to the previous value, it reports the time elapsed since the last progress and emits stalled metrics. Once a backup is complete, the record is reset. [SRE-7631] --- pghoard/basebackup/base.py | 4 ++- pghoard/common.py | 57 ++++++++++++++++++++++++++++++++++++++ pghoard/transfer.py | 36 ++++++++++++++++++++++-- test/test_common.py | 52 +++++++++++++++++++++++++++++++++- test/test_transferagent.py | 26 ++++++++++++++++- 5 files changed, 169 insertions(+), 6 deletions(-) diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 50c2be4c..00b85a5c 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -30,7 +30,7 @@ from pghoard.basebackup.delta import DeltaBaseBackup from pghoard.common import ( TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData, - FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, + FileType, NoException, PersistedProgress, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) @@ -564,6 +564,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = db_conn.commit() self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base) + progress_instance = PersistedProgress() + progress_instance.reset_all(metrics=self.metrics) start_time = time.monotonic() if delta: diff --git a/pghoard/common.py b/pghoard/common.py index ee444b8c..2ae49f2c 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -15,6 +15,7 @@ import re import tarfile import tempfile +import threading import time from contextlib import suppress from dataclasses import dataclass, field @@ -24,13 +25,16 @@ from threading import Thread from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast) +from pydantic import BaseModel, Field from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile from rohmu.errors import Error, InvalidConfigurationError from rohmu.typing import FileLike, HasName from pghoard import pgutil +from pghoard.metrics import Metrics TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json" +PROGRESS_FILE: Final[str] = "persisted_progress_file.json" LOG = logging.getLogger("pghoard.common") @@ -100,6 +104,59 @@ class BaseBackupMode(StrEnum): pipe = "pipe" +class ProgressData(BaseModel): + current_progress: float = 0 + last_updated_time: float = 0 + + @property + def age(self) -> float: + return time.time() - self.last_updated_time + + def update(self, current_progress: float) -> None: + self.current_progress = current_progress + self.last_updated_time = time.time() + + +class PersistedProgress(BaseModel): + progress: Dict[str, ProgressData] = Field(default_factory=dict) + _lock: threading.Lock = threading.Lock() + + @classmethod + def read(cls, metrics: Metrics) -> "PersistedProgress": + if os.path.exists(PROGRESS_FILE): + with open(PROGRESS_FILE, "r") as file: + try: + return cls.parse_raw(file.read()) + except Exception as ex: # pylint: disable=broad-except + LOG.exception("Failed to read persisted progress file: %r", ex) + metrics.unexpected_exception(ex, where="read_persisted_progress") + return cls() + + def write(self, metrics: Metrics): + with self._lock: + try: + with tempfile.NamedTemporaryFile("w", delete=False, dir=os.path.dirname(PROGRESS_FILE)) as temp_file: + temp_file.write(self.json()) + temp_path = temp_file.name + os.rename(temp_path, PROGRESS_FILE) + except Exception as ex: # pylint: disable=broad-except + LOG.exception("Failed to write persisted progress file: %r", ex) + metrics.unexpected_exception(ex, where="write_persisted_progress") + + def get(self, key: str) -> ProgressData: + self.progress.setdefault(key, ProgressData()) + return self.progress[key] + + def reset(self, key: str, metrics: Metrics) -> None: + if key in self.progress: + del self.progress[key] + self.write(metrics=metrics) + + def reset_all(self, metrics: Metrics) -> None: + self.progress = {} + self.write(metrics=metrics) + + def create_pgpass_file(connection_string_or_info): """Look up password from the given object which can be a dict or a string and write a possible password in a pgpass file; diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 1af3daf3..44f4a61e 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -25,7 +25,7 @@ from rohmu.typing import Metadata from pghoard.common import ( - CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, + CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, get_object_storage_config ) from pghoard.fetcher import FileFetchManager @@ -147,14 +147,33 @@ def untrack_upload_event(self, file_key: str) -> None: def increment(self, file_key: str, total_bytes_uploaded: float) -> None: metric_data = {} + persisted_progress = PersistedProgress.read(metrics=self.metrics) + with self._tracked_events_lock: if file_key not in self._tracked_events: raise Exception(f"UploadEvent for {file_key} is not being tracked.") file_type = self._tracked_events[file_key].file_type if file_type in ( - FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + FileType.Basebackup, + FileType.Basebackup_chunk, + FileType.Basebackup_delta, + FileType.Basebackup_delta_chunk, ): + progress_info = persisted_progress.get(file_key) + if total_bytes_uploaded > progress_info.current_progress: + progress_info.update(total_bytes_uploaded) + persisted_progress.write(metrics=self.metrics) + self.metrics.gauge("pghoard.basebackup_stalled", 0) + else: + stalled_age = progress_info.age + self.metrics.gauge("pghoard.basebackup_stalled", stalled_age) + if stalled_age >= 600: + self.log.warning( + "Upload for file %s has been stalled for %s seconds.", + file_key, + stalled_age, + ) metric_data = { "metric": "pghoard.basebackup_bytes_uploaded", "inc_value": total_bytes_uploaded, @@ -410,6 +429,13 @@ def run_safe(self): time.monotonic() - start_time ) + if file_to_transfer.operation in {TransferOperation.Upload} and filetype in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + if result.success: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) + self.fetch_manager.stop() self.log.debug("Quitting TransferAgent") @@ -513,6 +539,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): # Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20)) - + if file_to_transfer.file_type in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ) and file_to_transfer.retry_number > 2: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) self.transfer_queue.put(file_to_transfer) return None diff --git a/test/test_common.py b/test/test_common.py index d575f79b..7aaa42e5 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -14,8 +14,9 @@ from mock.mock import Mock from rohmu.errors import Error +from pghoard import metrics from pghoard.common import ( - TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file, + TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file, extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, pg_major_version, pg_version_string_to_number, write_json_file ) @@ -88,6 +89,55 @@ def test_json_serialization(self, tmpdir): assert ob2 == ob2_ + def test_persisted_progress(self, mocker, tmp_path): + test_progress_file = tmp_path / "test_progress.json" + original_time = 1625072042.123456 + test_data = { + "progress": { + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": { + "current_progress": 100, + "last_updated_time": original_time + } + } + } + + with open(test_progress_file, "w") as file: + json.dump(test_data, file) + + mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file) + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == 100 + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time == 1625072042.123456 + + new_progress = 200 + progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b") + progress_info.update(new_progress) + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == new_progress + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time > original_time + + def test_default_persisted_progress_creation(self, mocker, tmp_path): + tmp_file = tmp_path / "non_existent_progress.json" + assert not tmp_file.exists() + + mocker.patch("pghoard.common.PROGRESS_FILE", str(tmp_file)) + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + + assert persisted_progress.progress == {} + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + assert tmp_file.exists() + with open(tmp_file, "r") as file: + data = json.load(file) + assert data == {"progress": {}} + def test_pg_major_version(): assert pg_major_version("10") == "10" diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 703383fb..31475a96 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -15,7 +15,7 @@ from rohmu.errors import FileNotFoundFromStorageError, StorageError from pghoard import metrics -from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent +from pghoard.common import (CallbackEvent, CallbackQueue, FileType, PersistedProgress, QuitEvent) from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker) # pylint: disable=attribute-defined-outside-init @@ -316,3 +316,27 @@ def test_handle_metadata_error(self): evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar") assert evt.success is False assert isinstance(evt.exception, Exception) + + def test_handle_upload_with_persisted_progress(self, mocker, tmp_path): + + temp_progress_file = tmp_path / "test_progress.json" + assert not temp_progress_file.exists() + + mocker.patch("pghoard.common.PROGRESS_FILE", temp_progress_file) + upload_event = UploadEvent( + backup_site_name="test_site", + file_type=FileType.Basebackup, + file_path=Path(self.foo_basebackup_path), + source_data=Path(self.foo_basebackup_path), + metadata={}, + file_size=3, + callback_queue=CallbackQueue(), + remove_after_upload=True + ) + + self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event) + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert temp_progress_file.exists() + assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3 + if temp_progress_file.exists(): + temp_progress_file.unlink()