From bb0625976371612eb5b091b73cfbe68bb4943a80 Mon Sep 17 00:00:00 2001 From: Sebin Sunny Date: Tue, 6 Feb 2024 13:15:45 +1100 Subject: [PATCH] This PR improves monitoring of pg basebackups. During a backup, it regularly checks how much data has been uploaded and compares this to the last recorded amount in a persisted progress file. If the upload is progressing, it updates the record with the new data and current time. If the backup has not advanced compared to the previous value, it reports the time elapsed since the last progress and emits stalled metrics. Once a backup is complete, the record is reset. [SRE-7631] --- pghoard/common.py | 53 ++++++++++++++++++++++++++++++++++++++ pghoard/transfer.py | 35 ++++++++++++++++++++++--- test/test_common.py | 38 ++++++++++++++++++++++++++- test/test_transferagent.py | 20 +++++++++++++- 4 files changed, 141 insertions(+), 5 deletions(-) diff --git a/pghoard/common.py b/pghoard/common.py index ee444b8c..3cd92f03 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -15,6 +15,7 @@ import re import tarfile import tempfile +import threading import time from contextlib import suppress from dataclasses import dataclass, field @@ -24,13 +25,16 @@ from threading import Thread from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast) +from pydantic import BaseModel from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile from rohmu.errors import Error, InvalidConfigurationError from rohmu.typing import FileLike, HasName from pghoard import pgutil +from pghoard.metrics import Metrics TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json" +PROGRESS_FILE: Final[str] = "persisted_progress_file.json" LOG = logging.getLogger("pghoard.common") @@ -100,6 +104,55 @@ class BaseBackupMode(StrEnum): pipe = "pipe" +class ProgressData(BaseModel): + current_progress: float = 0 + last_updated_time: float = 0 + + @property + def age(self) -> float: + return time.time() - self.last_updated_time + + def update(self, current_progress: float) -> None: + self.current_progress = current_progress + self.last_updated_time = time.time() + + +class PersistedProgress(BaseModel): + progress: Dict[str, ProgressData] = {} + _lock: threading.Lock = threading.Lock() + + @classmethod + def read(cls, metrics: Metrics) -> "PersistedProgress": + if os.path.exists(PROGRESS_FILE): + with open(PROGRESS_FILE, "r") as file: + try: + return cls.parse_raw(file.read()) + except Exception as ex: + LOG.exception("Failed to read persisted progress file: %r", ex) + metrics.unexpected_exception(ex, where="read_persisted_progress") + return cls() + + def write(self, metrics: Metrics): + with self._lock: + try: + with tempfile.NamedTemporaryFile("w", delete=False, dir=os.path.dirname(PROGRESS_FILE)) as temp_file: + temp_file.write(self.json()) + temp_path = temp_file.name + os.rename(temp_path, PROGRESS_FILE) + except Exception as ex: + LOG.exception("Failed to write persisted progress file: %r", ex) + metrics.unexpected_exception(ex, where="write_persisted_progress") + + def get(self, key: str) -> ProgressData: + self.progress.setdefault(key, ProgressData()) + return self.progress[key] + + def reset(self, key: str, metrics: Metrics) -> None: + if key in self.progress and self.progress[key] != ProgressData(): + del self.progress[key] + self.write(metrics=metrics) + + def create_pgpass_file(connection_string_or_info): """Look up password from the given object which can be a dict or a string and write a possible password in a pgpass file; diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 1af3daf3..30432e6b 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -25,7 +25,7 @@ from rohmu.typing import Metadata from pghoard.common import ( - CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, + CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, get_object_storage_config ) from pghoard.fetcher import FileFetchManager @@ -147,14 +147,32 @@ def untrack_upload_event(self, file_key: str) -> None: def increment(self, file_key: str, total_bytes_uploaded: float) -> None: metric_data = {} + persisted_progress = PersistedProgress.read(metrics=self.metrics) + with self._tracked_events_lock: if file_key not in self._tracked_events: raise Exception(f"UploadEvent for {file_key} is not being tracked.") file_type = self._tracked_events[file_key].file_type if file_type in ( - FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + FileType.Basebackup, + FileType.Basebackup_chunk, + FileType.Basebackup_delta, + FileType.Basebackup_delta_chunk, ): + progress_info = persisted_progress.get(file_key) + if total_bytes_uploaded > progress_info.current_progress: + progress_info.update(total_bytes_uploaded) + persisted_progress.write(metrics=self.metrics) + self.metrics.gauge("pghoard.basebackup_stalled", 0) + else: + stalled_age = progress_info.age + self.metrics.gauge("basebackup_stalled", stalled_age) + self.log.warning( + "Upload for file %s has been stalled for %s seconds.", + file_key, + stalled_age, + ) metric_data = { "metric": "pghoard.basebackup_bytes_uploaded", "inc_value": total_bytes_uploaded, @@ -410,6 +428,13 @@ def run_safe(self): time.monotonic() - start_time ) + if file_to_transfer.operation in {TransferOperation.Upload} and filetype in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + if result.success: + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) + self.fetch_manager.stop() self.log.debug("Quitting TransferAgent") @@ -513,6 +538,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): # Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20)) - + if file_to_transfer.file_type in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + persisted_progress = PersistedProgress.read(metrics=self.metrics) + persisted_progress.reset(key, metrics=self.metrics) self.transfer_queue.put(file_to_transfer) return None diff --git a/test/test_common.py b/test/test_common.py index d575f79b..4a9e92cc 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -9,13 +9,15 @@ import os from pathlib import Path from typing import Any, Dict +from unittest.mock import patch import pytest from mock.mock import Mock from rohmu.errors import Error +from pghoard import metrics from pghoard.common import ( - TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file, + TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file, extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, pg_major_version, pg_version_string_to_number, write_json_file ) @@ -88,6 +90,40 @@ def test_json_serialization(self, tmpdir): assert ob2 == ob2_ + def test_persisted_progress(self): + test_progress_file = "test_progress.json" + + test_data = { + "progress": { + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": { + "current_progress": 100, + "last_updated_time": 1625072042.123456 + } + } + } + + with open(test_progress_file, "w") as file: + json.dump(test_data, file) + + with patch("pghoard.common.PROGRESS_FILE", test_progress_file): + persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == 100 + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time == 1625072042.123456 + + new_progress = 200 + progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b") + progress_info.update(new_progress) + persisted_progress.write(metrics=metrics.Metrics(statsd={})) + + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == new_progress + + os.remove(test_progress_file) + def test_pg_major_version(): assert pg_major_version("10") == "10" diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 703383fb..69a85dbb 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -15,7 +15,7 @@ from rohmu.errors import FileNotFoundFromStorageError, StorageError from pghoard import metrics -from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent +from pghoard.common import (PROGRESS_FILE, CallbackEvent, CallbackQueue, FileType, PersistedProgress, QuitEvent) from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker) # pylint: disable=attribute-defined-outside-init @@ -316,3 +316,21 @@ def test_handle_metadata_error(self): evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar") assert evt.success is False assert isinstance(evt.exception, Exception) + + def test_handle_upload_with_persisted_progress(self): + upload_event = UploadEvent( + backup_site_name="test_site", + file_type=FileType.Basebackup, + file_path=Path(self.foo_basebackup_path), + source_data=Path(self.foo_basebackup_path), + metadata={}, + file_size=3, + callback_queue=CallbackQueue(), + remove_after_upload=True + ) + + self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event) + updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) + assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3 + if os.path.exists(PROGRESS_FILE): + os.remove(PROGRESS_FILE)