Skip to content

Commit

Permalink
This PR improves monitoring of pg basebackups. During a backup, it re…
Browse files Browse the repository at this point in the history
…gularly checks how much data has been uploaded and compares this to the last recorded amount in a persisted progress file. If the upload is progressing, it updates the record with the new data and current time. If the backup has not advanced compared to the previous value, it reports the time elapsed since the last progress and emits stalled metrics. Once a backup is complete, the record is reset.

[SRE-7631]
  • Loading branch information
sebinsunny committed Mar 18, 2024
1 parent 2b2ea98 commit 6e00a1c
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 22 deletions.
4 changes: 3 additions & 1 deletion pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pghoard.basebackup.delta import DeltaBaseBackup
from pghoard.common import (
TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData,
FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
FileType, NoException, PersistedProgress, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
Expand Down Expand Up @@ -564,6 +564,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
db_conn.commit()

self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base)
progress_instance = PersistedProgress()
progress_instance.reset_all(metrics=self.metrics)
start_time = time.monotonic()

if delta:
Expand Down
35 changes: 32 additions & 3 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

from rohmu import BaseTransfer, rohmufile
from rohmu.dates import now
from rohmu.delta.common import (BackupManifest, BackupPath, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult)
from rohmu.delta.common import (
BackupManifest, BackupPath, ProgressMetrics, ProgressStep, SnapshotFile, SnapshotHash, SnapshotResult,
SnapshotUploadResult
)
from rohmu.delta.snapshot import Snapshotter
from rohmu.errors import FileNotFoundFromStorageError
from rohmu.typing import HasRead, HasSeek

from pghoard.basebackup.chunks import ChunkUploader
from pghoard.common import (
BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, FileType,
FileTypePrefixes, download_backup_meta_file, extract_pghoard_delta_metadata
FileTypePrefixes, PersistedProgress, download_backup_meta_file, extract_pghoard_delta_metadata
)
from pghoard.metrics import Metrics
from pghoard.transfer import TransferQueue, UploadEvent
Expand All @@ -45,6 +48,8 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol):

FilesChunk = Set[Tuple]
SnapshotFiles = Dict[str, SnapshotFile]
PROGRESS_CHECK_INTERVAL = 10
STALLED_PROGRESS_THRESHOLD = 600

EMPTY_FILE_HASH = hashlib.blake2s().hexdigest()

Expand Down Expand Up @@ -73,9 +78,33 @@ def __init__(
self.tracked_snapshot_files: SnapshotFiles = self._list_existing_files()
self.chunk_uploader = chunk_uploader
self.data_file_format = data_file_format
self.last_flush_time: float = 0

def _snapshot(self, snapshotter: Snapshotter) -> SnapshotResult:
snapshotter.snapshot(reuse_old_snapshotfiles=False)
def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetrics):
key = "snapshot_progress"
elapsed: float = time.monotonic() - self.last_flush_time
if elapsed > PROGRESS_CHECK_INTERVAL:
persisted_progress = PersistedProgress.read(self.metrics)
progress_info = persisted_progress.get(key)
tags: dict = {"phase": progress_step.value}

if progress_data["handled"] > progress_info.current_progress:
progress_info.update(progress_data["handled"])
persisted_progress.write(self.metrics)
self.last_flush_time = time.monotonic()
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags)

if stalled_age >= STALLED_PROGRESS_THRESHOLD:
self.log.warning(
"Snapshot progress for %s has been stalled for %s seconds.", progress_step, stalled_age
)

self.last_flush_time = time.monotonic()
snapshotter.snapshot(reuse_old_snapshotfiles=False, progress_callback=progress_callback)
snapshot_result = SnapshotResult(end=None, state=None, hashes=None)
snapshot_result.state = snapshotter.get_snapshot_state()
snapshot_result.hashes = [
Expand Down
69 changes: 69 additions & 0 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import tarfile
import tempfile
import threading
import time
from contextlib import suppress
from dataclasses import dataclass, field
Expand All @@ -24,13 +25,16 @@
from threading import Thread
from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast)

from pydantic import BaseModel, Field
from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile
from rohmu.errors import Error, InvalidConfigurationError
from rohmu.typing import FileLike, HasName

from pghoard import pgutil
from pghoard.metrics import Metrics

TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json"
PROGRESS_FILE: Final[str] = "persisted_progress_file.json"

LOG = logging.getLogger("pghoard.common")

Expand Down Expand Up @@ -100,6 +104,71 @@ class BaseBackupMode(StrEnum):
pipe = "pipe"


class ProgressData(BaseModel):
current_progress: float = 0
last_updated_time: float = 0

@property
def age(self) -> float:
return time.time() - self.last_updated_time

def update(self, current_progress: float) -> None:
self.current_progress = current_progress
self.last_updated_time = time.time()


def atomic_write(file_path: str, data: str, temp_dir: Optional[str] = None):
temp_dir = temp_dir or os.path.dirname(file_path)
temp_file = None
try:
with tempfile.NamedTemporaryFile("w", delete=False, dir=temp_dir) as temp_file:
temp_file.write(data)
temp_path = temp_file.name
os.rename(temp_path, file_path)
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to write file atomically: %r", ex)
if temp_file:
with suppress(FileNotFoundError):
os.unlink(temp_file.name)


class PersistedProgress(BaseModel):
progress: Dict[str, ProgressData] = Field(default_factory=dict)
_lock: threading.Lock = threading.Lock()

@classmethod
def read(cls, metrics: Metrics) -> "PersistedProgress":
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, "r") as file:
try:
return cls.parse_raw(file.read())
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to read persisted progress file: %r", ex)
metrics.unexpected_exception(ex, where="read_persisted_progress")
return cls()

def write(self, metrics: Metrics):
with self._lock:
try:
data = self.json()
atomic_write(PROGRESS_FILE, data)
except Exception as ex: # pylint: disable=broad-except
metrics.unexpected_exception(ex, where="write_persisted_progress")

def get(self, key: str) -> ProgressData:
self.progress.setdefault(key, ProgressData())
return self.progress[key]

def reset(self, key: str, metrics: Metrics) -> None:
if key in self.progress:
del self.progress[key]
self.write(metrics=metrics)

def reset_all(self, metrics: Metrics) -> None:
self.progress = {}
self.write(metrics=metrics)


def create_pgpass_file(connection_string_or_info):
"""Look up password from the given object which can be a dict or a
string and write a possible password in a pgpass file;
Expand Down
36 changes: 33 additions & 3 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from rohmu.typing import Metadata

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
Expand Down Expand Up @@ -147,14 +147,33 @@ def untrack_upload_event(self, file_key: str) -> None:

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
persisted_progress = PersistedProgress.read(metrics=self.metrics)

with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")

file_type = self._tracked_events[file_key].file_type
if file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
FileType.Basebackup,
FileType.Basebackup_chunk,
FileType.Basebackup_delta,
FileType.Basebackup_delta_chunk,
):
progress_info = persisted_progress.get(file_key)
if total_bytes_uploaded > progress_info.current_progress:
progress_info.update(total_bytes_uploaded)
persisted_progress.write(metrics=self.metrics)
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age)
if stalled_age >= 600:
self.log.warning(
"Upload for file %s has been stalled for %s seconds.",
file_key,
stalled_age,
)
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
Expand Down Expand Up @@ -410,6 +429,13 @@ def run_safe(self):
time.monotonic() - start_time
)

if file_to_transfer.operation in {TransferOperation.Upload} and filetype in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
if result.success:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")

Expand Down Expand Up @@ -513,6 +539,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):

# Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times
self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20))

if file_to_transfer.file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
) and file_to_transfer.retry_number < 2:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
self.transfer_queue.put(file_to_transfer)
return None
43 changes: 30 additions & 13 deletions test/basebackup/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,22 +442,39 @@ def test_upload_single_delta_files_progress(
delta_hashes = {file_hash for _, file_hash in delta_files}

with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest, \
patch.object(deltabasebackup, "metrics") as mock_metrics, \
patch.object(deltabasebackup, "metrics") as mock_metrics, \
patch.object(snapshotter, "update_snapshot_file_data"):
mock_delta_upload_hexdigest.side_effect = [(200, 10, file_hash, True) for file_hash in delta_hashes]
with snapshotter.lock:
deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access
deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access
todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress
)
expected_calls = [
mock.call(
"pghoard.basebackup_estimated_progress",
initial_progress + (idx + 1) * (100 - initial_progress) / files_count,
tags={"site": "delta"}
) for idx in range(files_count)
]
assert mock_metrics.gauge.mock_calls == expected_calls
with patch("pghoard.basebackup.delta.PROGRESS_CHECK_INTERVAL", new=1):
deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access
deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access
todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress
)
expected_calls = [
mock.call(
"pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "creating_missing_directories"}
),
mock.call("pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "adding_missing_files"}),
]

expected_calls += [
mock.call(
"pghoard.seconds_since_backup_progress_stalled",
0,
tags={"phase": "processing_and_hashing_snapshot_files"}
) for _ in range(files_count)
]

expected_calls = [
mock.call(
"pghoard.basebackup_estimated_progress",
initial_progress + (idx + 1) * (100 - initial_progress) / files_count,
tags={"site": "delta"}
) for idx in range(files_count)
]

assert mock_metrics.gauge.mock_calls == expected_calls


def test_upload_single_delta_files(
Expand Down
52 changes: 51 additions & 1 deletion test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
from mock.mock import Mock
from rohmu.errors import Error

from pghoard import metrics
from pghoard.common import (
TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file,
TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file,
extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode,
pg_major_version, pg_version_string_to_number, write_json_file
)
Expand Down Expand Up @@ -88,6 +89,55 @@ def test_json_serialization(self, tmpdir):

assert ob2 == ob2_

def test_persisted_progress(self, mocker, tmp_path):
test_progress_file = tmp_path / "test_progress.json"
original_time = 1625072042.123456
test_data = {
"progress": {
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": {
"current_progress": 100,
"last_updated_time": original_time
}
}
}

with open(test_progress_file, "w") as file:
json.dump(test_data, file)

mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file)
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == 100
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == 1625072042.123456

new_progress = 200
progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b")
progress_info.update(new_progress)
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == new_progress
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time > original_time

def test_default_persisted_progress_creation(self, mocker, tmp_path):
tmp_file = tmp_path / "non_existent_progress.json"
assert not tmp_file.exists()

mocker.patch("pghoard.common.PROGRESS_FILE", str(tmp_file))
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))

assert persisted_progress.progress == {}
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

assert tmp_file.exists()
with open(tmp_file, "r") as file:
data = json.load(file)
assert data == {"progress": {}}


def test_pg_major_version():
assert pg_major_version("10") == "10"
Expand Down
Loading

0 comments on commit 6e00a1c

Please sign in to comment.