Skip to content

Commit

Permalink
Merge pull request #615 from Aiven-Open/sebinsunny-refactor-pg-backup…
Browse files Browse the repository at this point in the history
…-metrics

Add progress based basebackup metrics
  • Loading branch information
facetoe authored Mar 18, 2024
2 parents 2b2ea98 + 6e00a1c commit 078c81f
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 22 deletions.
4 changes: 3 additions & 1 deletion pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pghoard.basebackup.delta import DeltaBaseBackup
from pghoard.common import (
TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData,
FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
FileType, NoException, PersistedProgress, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
Expand Down Expand Up @@ -564,6 +564,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
db_conn.commit()

self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base)
progress_instance = PersistedProgress()
progress_instance.reset_all(metrics=self.metrics)
start_time = time.monotonic()

if delta:
Expand Down
35 changes: 32 additions & 3 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

from rohmu import BaseTransfer, rohmufile
from rohmu.dates import now
from rohmu.delta.common import (BackupManifest, BackupPath, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult)
from rohmu.delta.common import (
BackupManifest, BackupPath, ProgressMetrics, ProgressStep, SnapshotFile, SnapshotHash, SnapshotResult,
SnapshotUploadResult
)
from rohmu.delta.snapshot import Snapshotter
from rohmu.errors import FileNotFoundFromStorageError
from rohmu.typing import HasRead, HasSeek

from pghoard.basebackup.chunks import ChunkUploader
from pghoard.common import (
BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, FileType,
FileTypePrefixes, download_backup_meta_file, extract_pghoard_delta_metadata
FileTypePrefixes, PersistedProgress, download_backup_meta_file, extract_pghoard_delta_metadata
)
from pghoard.metrics import Metrics
from pghoard.transfer import TransferQueue, UploadEvent
Expand All @@ -45,6 +48,8 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol):

FilesChunk = Set[Tuple]
SnapshotFiles = Dict[str, SnapshotFile]
PROGRESS_CHECK_INTERVAL = 10
STALLED_PROGRESS_THRESHOLD = 600

EMPTY_FILE_HASH = hashlib.blake2s().hexdigest()

Expand Down Expand Up @@ -73,9 +78,33 @@ def __init__(
self.tracked_snapshot_files: SnapshotFiles = self._list_existing_files()
self.chunk_uploader = chunk_uploader
self.data_file_format = data_file_format
self.last_flush_time: float = 0

def _snapshot(self, snapshotter: Snapshotter) -> SnapshotResult:
snapshotter.snapshot(reuse_old_snapshotfiles=False)
def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetrics):
key = "snapshot_progress"
elapsed: float = time.monotonic() - self.last_flush_time
if elapsed > PROGRESS_CHECK_INTERVAL:
persisted_progress = PersistedProgress.read(self.metrics)
progress_info = persisted_progress.get(key)
tags: dict = {"phase": progress_step.value}

if progress_data["handled"] > progress_info.current_progress:
progress_info.update(progress_data["handled"])
persisted_progress.write(self.metrics)
self.last_flush_time = time.monotonic()
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags)

if stalled_age >= STALLED_PROGRESS_THRESHOLD:
self.log.warning(
"Snapshot progress for %s has been stalled for %s seconds.", progress_step, stalled_age
)

self.last_flush_time = time.monotonic()
snapshotter.snapshot(reuse_old_snapshotfiles=False, progress_callback=progress_callback)
snapshot_result = SnapshotResult(end=None, state=None, hashes=None)
snapshot_result.state = snapshotter.get_snapshot_state()
snapshot_result.hashes = [
Expand Down
69 changes: 69 additions & 0 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import tarfile
import tempfile
import threading
import time
from contextlib import suppress
from dataclasses import dataclass, field
Expand All @@ -24,13 +25,16 @@
from threading import Thread
from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast)

from pydantic import BaseModel, Field
from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile
from rohmu.errors import Error, InvalidConfigurationError
from rohmu.typing import FileLike, HasName

from pghoard import pgutil
from pghoard.metrics import Metrics

TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json"
PROGRESS_FILE: Final[str] = "persisted_progress_file.json"

LOG = logging.getLogger("pghoard.common")

Expand Down Expand Up @@ -100,6 +104,71 @@ class BaseBackupMode(StrEnum):
pipe = "pipe"


class ProgressData(BaseModel):
current_progress: float = 0
last_updated_time: float = 0

@property
def age(self) -> float:
return time.time() - self.last_updated_time

def update(self, current_progress: float) -> None:
self.current_progress = current_progress
self.last_updated_time = time.time()


def atomic_write(file_path: str, data: str, temp_dir: Optional[str] = None):
temp_dir = temp_dir or os.path.dirname(file_path)
temp_file = None
try:
with tempfile.NamedTemporaryFile("w", delete=False, dir=temp_dir) as temp_file:
temp_file.write(data)
temp_path = temp_file.name
os.rename(temp_path, file_path)
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to write file atomically: %r", ex)
if temp_file:
with suppress(FileNotFoundError):
os.unlink(temp_file.name)


class PersistedProgress(BaseModel):
progress: Dict[str, ProgressData] = Field(default_factory=dict)
_lock: threading.Lock = threading.Lock()

@classmethod
def read(cls, metrics: Metrics) -> "PersistedProgress":
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, "r") as file:
try:
return cls.parse_raw(file.read())
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to read persisted progress file: %r", ex)
metrics.unexpected_exception(ex, where="read_persisted_progress")
return cls()

def write(self, metrics: Metrics):
with self._lock:
try:
data = self.json()
atomic_write(PROGRESS_FILE, data)
except Exception as ex: # pylint: disable=broad-except
metrics.unexpected_exception(ex, where="write_persisted_progress")

def get(self, key: str) -> ProgressData:
self.progress.setdefault(key, ProgressData())
return self.progress[key]

def reset(self, key: str, metrics: Metrics) -> None:
if key in self.progress:
del self.progress[key]
self.write(metrics=metrics)

def reset_all(self, metrics: Metrics) -> None:
self.progress = {}
self.write(metrics=metrics)


def create_pgpass_file(connection_string_or_info):
"""Look up password from the given object which can be a dict or a
string and write a possible password in a pgpass file;
Expand Down
36 changes: 33 additions & 3 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from rohmu.typing import Metadata

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
Expand Down Expand Up @@ -147,14 +147,33 @@ def untrack_upload_event(self, file_key: str) -> None:

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
persisted_progress = PersistedProgress.read(metrics=self.metrics)

with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")

file_type = self._tracked_events[file_key].file_type
if file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
FileType.Basebackup,
FileType.Basebackup_chunk,
FileType.Basebackup_delta,
FileType.Basebackup_delta_chunk,
):
progress_info = persisted_progress.get(file_key)
if total_bytes_uploaded > progress_info.current_progress:
progress_info.update(total_bytes_uploaded)
persisted_progress.write(metrics=self.metrics)
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age)
if stalled_age >= 600:
self.log.warning(
"Upload for file %s has been stalled for %s seconds.",
file_key,
stalled_age,
)
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
Expand Down Expand Up @@ -410,6 +429,13 @@ def run_safe(self):
time.monotonic() - start_time
)

if file_to_transfer.operation in {TransferOperation.Upload} and filetype in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
if result.success:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")

Expand Down Expand Up @@ -513,6 +539,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):

# Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times
self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20))

if file_to_transfer.file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
) and file_to_transfer.retry_number < 2:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
self.transfer_queue.put(file_to_transfer)
return None
43 changes: 30 additions & 13 deletions test/basebackup/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,22 +442,39 @@ def test_upload_single_delta_files_progress(
delta_hashes = {file_hash for _, file_hash in delta_files}

with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest, \
patch.object(deltabasebackup, "metrics") as mock_metrics, \
patch.object(deltabasebackup, "metrics") as mock_metrics, \
patch.object(snapshotter, "update_snapshot_file_data"):
mock_delta_upload_hexdigest.side_effect = [(200, 10, file_hash, True) for file_hash in delta_hashes]
with snapshotter.lock:
deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access
deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access
todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress
)
expected_calls = [
mock.call(
"pghoard.basebackup_estimated_progress",
initial_progress + (idx + 1) * (100 - initial_progress) / files_count,
tags={"site": "delta"}
) for idx in range(files_count)
]
assert mock_metrics.gauge.mock_calls == expected_calls
with patch("pghoard.basebackup.delta.PROGRESS_CHECK_INTERVAL", new=1):
deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access
deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access
todo_hexdigests=delta_hashes, snapshotter=snapshotter, progress=initial_progress
)
expected_calls = [
mock.call(
"pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "creating_missing_directories"}
),
mock.call("pghoard.seconds_since_backup_progress_stalled", 0, tags={"phase": "adding_missing_files"}),
]

expected_calls += [
mock.call(
"pghoard.seconds_since_backup_progress_stalled",
0,
tags={"phase": "processing_and_hashing_snapshot_files"}
) for _ in range(files_count)
]

expected_calls = [
mock.call(
"pghoard.basebackup_estimated_progress",
initial_progress + (idx + 1) * (100 - initial_progress) / files_count,
tags={"site": "delta"}
) for idx in range(files_count)
]

assert mock_metrics.gauge.mock_calls == expected_calls


def test_upload_single_delta_files(
Expand Down
52 changes: 51 additions & 1 deletion test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
from mock.mock import Mock
from rohmu.errors import Error

from pghoard import metrics
from pghoard.common import (
TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file,
TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file,
extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode,
pg_major_version, pg_version_string_to_number, write_json_file
)
Expand Down Expand Up @@ -88,6 +89,55 @@ def test_json_serialization(self, tmpdir):

assert ob2 == ob2_

def test_persisted_progress(self, mocker, tmp_path):
test_progress_file = tmp_path / "test_progress.json"
original_time = 1625072042.123456
test_data = {
"progress": {
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": {
"current_progress": 100,
"last_updated_time": original_time
}
}
}

with open(test_progress_file, "w") as file:
json.dump(test_data, file)

mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file)
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == 100
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == 1625072042.123456

new_progress = 200
progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b")
progress_info.update(new_progress)
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == new_progress
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time > original_time

def test_default_persisted_progress_creation(self, mocker, tmp_path):
tmp_file = tmp_path / "non_existent_progress.json"
assert not tmp_file.exists()

mocker.patch("pghoard.common.PROGRESS_FILE", str(tmp_file))
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))

assert persisted_progress.progress == {}
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

assert tmp_file.exists()
with open(tmp_file, "r") as file:
data = json.load(file)
assert data == {"progress": {}}


def test_pg_major_version():
assert pg_major_version("10") == "10"
Expand Down
Loading

0 comments on commit 078c81f

Please sign in to comment.