Skip to content

Commit

Permalink
This PR improves monitoring of pg basebackups. During a backup, it re…
Browse files Browse the repository at this point in the history
…gularly checks how much data has been uploaded and compares this to the last recorded amount in a persisted progress file. If the upload is progressing, it updates the record with the new data and current time. If the backup has not advanced compared to the previous value, it reports the time elapsed since the last progress and emits stalled metrics. Once a backup is complete, the record is reset.

[SRE-7631]
  • Loading branch information
sebinsunny committed Mar 14, 2024
1 parent 2b2ea98 commit 4a900f1
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 9 deletions.
4 changes: 3 additions & 1 deletion pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pghoard.basebackup.delta import DeltaBaseBackup
from pghoard.common import (
TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData,
FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
FileType, NoException, PersistedProgress, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
Expand Down Expand Up @@ -564,6 +564,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
db_conn.commit()

self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base)
progress_instance = PersistedProgress()
progress_instance.reset_all(metrics=self.metrics)
start_time = time.monotonic()

if delta:
Expand Down
30 changes: 27 additions & 3 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

from rohmu import BaseTransfer, rohmufile
from rohmu.dates import now
from rohmu.delta.common import (BackupManifest, BackupPath, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult)
from rohmu.delta.common import (
BackupManifest, BackupPath, ProgressMetrics, ProgressStep, SnapshotFile, SnapshotHash, SnapshotResult,
SnapshotUploadResult
)
from rohmu.delta.snapshot import Snapshotter
from rohmu.errors import FileNotFoundFromStorageError
from rohmu.typing import HasRead, HasSeek

from pghoard.basebackup.chunks import ChunkUploader
from pghoard.common import (
BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, FileType,
FileTypePrefixes, download_backup_meta_file, extract_pghoard_delta_metadata
FileTypePrefixes, PersistedProgress, download_backup_meta_file, extract_pghoard_delta_metadata
)
from pghoard.metrics import Metrics
from pghoard.transfer import TransferQueue, UploadEvent
Expand Down Expand Up @@ -73,9 +76,30 @@ def __init__(
self.tracked_snapshot_files: SnapshotFiles = self._list_existing_files()
self.chunk_uploader = chunk_uploader
self.data_file_format = data_file_format
self.last_flush_time: float = 0

def _snapshot(self, snapshotter: Snapshotter) -> SnapshotResult:
snapshotter.snapshot(reuse_old_snapshotfiles=False)
def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetrics):
key = "snapshot_progress"
persisted_progress = PersistedProgress.read(self.metrics)
progress_info = persisted_progress.get(key)
tags: dict = {"phase": progress_step.value}
if progress_data["handled"] > progress_info.current_progress:
progress_info.update(progress_data["handled"])
elapsed: float = time.monotonic() - self.last_flush_time
if elapsed > 10:
persisted_progress.write(self.metrics)
self.last_flush_time = time.monotonic()
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags)

if stalled_age >= 600:
self.log.warning("Snapshot progress for %s has been stalled for %s seconds.", progress_step, stalled_age)

self.last_flush_time = time.monotonic()
snapshotter.snapshot(reuse_old_snapshotfiles=False, progress_callback=progress_callback)
snapshot_result = SnapshotResult(end=None, state=None, hashes=None)
snapshot_result.state = snapshotter.get_snapshot_state()
snapshot_result.hashes = [
Expand Down
69 changes: 69 additions & 0 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import tarfile
import tempfile
import threading
import time
from contextlib import suppress
from dataclasses import dataclass, field
Expand All @@ -24,13 +25,16 @@
from threading import Thread
from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast)

from pydantic import BaseModel, Field
from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile
from rohmu.errors import Error, InvalidConfigurationError
from rohmu.typing import FileLike, HasName

from pghoard import pgutil
from pghoard.metrics import Metrics

TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json"
PROGRESS_FILE: Final[str] = "persisted_progress_file.json"

LOG = logging.getLogger("pghoard.common")

Expand Down Expand Up @@ -100,6 +104,71 @@ class BaseBackupMode(StrEnum):
pipe = "pipe"


class ProgressData(BaseModel):
current_progress: float = 0
last_updated_time: float = 0

@property
def age(self) -> float:
return time.time() - self.last_updated_time

def update(self, current_progress: float) -> None:
self.current_progress = current_progress
self.last_updated_time = time.time()


def atomic_write(file_path: str, data: str, temp_dir: Optional[str] = None):
temp_dir = temp_dir or os.path.dirname(file_path)
temp_file = None
try:
with tempfile.NamedTemporaryFile("w", delete=False, dir=temp_dir) as temp_file:
temp_file.write(data)
temp_path = temp_file.name
os.rename(temp_path, file_path)
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to write file atomically: %r", ex)
if temp_file:
with suppress(FileNotFoundError):
os.unlink(temp_file.name)


class PersistedProgress(BaseModel):
progress: Dict[str, ProgressData] = Field(default_factory=dict)
_lock: threading.Lock = threading.Lock()

@classmethod
def read(cls, metrics: Metrics) -> "PersistedProgress":
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, "r") as file:
try:
return cls.parse_raw(file.read())
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to read persisted progress file: %r", ex)
metrics.unexpected_exception(ex, where="read_persisted_progress")
return cls()

def write(self, metrics: Metrics):
with self._lock:
try:
data = self.json()
atomic_write(PROGRESS_FILE, data)
except Exception as ex: # pylint: disable=broad-except
metrics.unexpected_exception(ex, where="write_persisted_progress")

def get(self, key: str) -> ProgressData:
self.progress.setdefault(key, ProgressData())
return self.progress[key]

def reset(self, key: str, metrics: Metrics) -> None:
if key in self.progress:
del self.progress[key]
self.write(metrics=metrics)

def reset_all(self, metrics: Metrics) -> None:
self.progress = {}
self.write(metrics=metrics)


def create_pgpass_file(connection_string_or_info):
"""Look up password from the given object which can be a dict or a
string and write a possible password in a pgpass file;
Expand Down
36 changes: 33 additions & 3 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from rohmu.typing import Metadata

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
Expand Down Expand Up @@ -147,14 +147,33 @@ def untrack_upload_event(self, file_key: str) -> None:

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
persisted_progress = PersistedProgress.read(metrics=self.metrics)

with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")

file_type = self._tracked_events[file_key].file_type
if file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
FileType.Basebackup,
FileType.Basebackup_chunk,
FileType.Basebackup_delta,
FileType.Basebackup_delta_chunk,
):
progress_info = persisted_progress.get(file_key)
if total_bytes_uploaded > progress_info.current_progress:
progress_info.update(total_bytes_uploaded)
persisted_progress.write(metrics=self.metrics)
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age)
if stalled_age >= 600:
self.log.warning(
"Upload for file %s has been stalled for %s seconds.",
file_key,
stalled_age,
)
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
Expand Down Expand Up @@ -410,6 +429,13 @@ def run_safe(self):
time.monotonic() - start_time
)

if file_to_transfer.operation in {TransferOperation.Upload} and filetype in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
if result.success:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")

Expand Down Expand Up @@ -513,6 +539,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):

# Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times
self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20))

if file_to_transfer.file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
) and file_to_transfer.retry_number < 2:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
self.transfer_queue.put(file_to_transfer)
return None
52 changes: 51 additions & 1 deletion test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
from mock.mock import Mock
from rohmu.errors import Error

from pghoard import metrics
from pghoard.common import (
TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file,
TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file,
extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode,
pg_major_version, pg_version_string_to_number, write_json_file
)
Expand Down Expand Up @@ -88,6 +89,55 @@ def test_json_serialization(self, tmpdir):

assert ob2 == ob2_

def test_persisted_progress(self, mocker, tmp_path):
test_progress_file = tmp_path / "test_progress.json"
original_time = 1625072042.123456
test_data = {
"progress": {
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": {
"current_progress": 100,
"last_updated_time": original_time
}
}
}

with open(test_progress_file, "w") as file:
json.dump(test_data, file)

mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file)
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == 100
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == 1625072042.123456

new_progress = 200
progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b")
progress_info.update(new_progress)
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == new_progress
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time > original_time

def test_default_persisted_progress_creation(self, mocker, tmp_path):
tmp_file = tmp_path / "non_existent_progress.json"
assert not tmp_file.exists()

mocker.patch("pghoard.common.PROGRESS_FILE", str(tmp_file))
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))

assert persisted_progress.progress == {}
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

assert tmp_file.exists()
with open(tmp_file, "r") as file:
data = json.load(file)
assert data == {"progress": {}}


def test_pg_major_version():
assert pg_major_version("10") == "10"
Expand Down
26 changes: 25 additions & 1 deletion test/test_transferagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from rohmu.errors import FileNotFoundFromStorageError, StorageError

from pghoard import metrics
from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent
from pghoard.common import (CallbackEvent, CallbackQueue, FileType, PersistedProgress, QuitEvent)
from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker)

# pylint: disable=attribute-defined-outside-init
Expand Down Expand Up @@ -316,3 +316,27 @@ def test_handle_metadata_error(self):
evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar")
assert evt.success is False
assert isinstance(evt.exception, Exception)

def test_handle_upload_with_persisted_progress(self, mocker, tmp_path):

temp_progress_file = tmp_path / "test_progress.json"
assert not temp_progress_file.exists()

mocker.patch("pghoard.common.PROGRESS_FILE", temp_progress_file)
upload_event = UploadEvent(
backup_site_name="test_site",
file_type=FileType.Basebackup,
file_path=Path(self.foo_basebackup_path),
source_data=Path(self.foo_basebackup_path),
metadata={},
file_size=3,
callback_queue=CallbackQueue(),
remove_after_upload=True
)

self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event)
updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert temp_progress_file.exists()
assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3
if temp_progress_file.exists():
temp_progress_file.unlink()

0 comments on commit 4a900f1

Please sign in to comment.