Skip to content

Commit

Permalink
This PR improves monitoring of pg basebackups. During a backup, it re…
Browse files Browse the repository at this point in the history
…gularly checks how much data has been uploaded and compares this to the last recorded amount in a persisted progress file. If the upload is progressing, it updates the record with the new data and current time. If the backup has not advanced compared to the previous value, it reports the time elapsed since the last progress and emits stalled metrics. Once a backup is complete, the record is reset.

[SRE-7631]
  • Loading branch information
sebinsunny committed Feb 8, 2024
1 parent 2b2ea98 commit 68ef95e
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 5 deletions.
53 changes: 53 additions & 0 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import tarfile
import tempfile
import threading
import time
from contextlib import suppress
from dataclasses import dataclass, field
Expand All @@ -24,13 +25,16 @@
from threading import Thread
from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast)

from pydantic import BaseModel
from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile
from rohmu.errors import Error, InvalidConfigurationError
from rohmu.typing import FileLike, HasName

from pghoard import pgutil
from pghoard.metrics import Metrics

TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json"
PROGRESS_FILE: Final[str] = "persisted_progress_file.json"

LOG = logging.getLogger("pghoard.common")

Expand Down Expand Up @@ -100,6 +104,55 @@ class BaseBackupMode(StrEnum):
pipe = "pipe"


class ProgressData(BaseModel):
current_progress: float = 0
last_updated_time: float = 0

@property
def age(self) -> float:
return time.time() - self.last_updated_time

def update(self, current_progress: float) -> None:
self.current_progress = current_progress
self.last_updated_time = time.time()


class PersistedProgress(BaseModel):
progress: Dict[str, ProgressData] = {}
_lock: threading.Lock = threading.Lock()

@classmethod
def read(cls, metrics: Metrics) -> "PersistedProgress":
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, "r") as file:
try:
return cls.parse_raw(file.read())
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to read persisted progress file: %r", ex)
metrics.unexpected_exception(ex, where="read_persisted_progress")
return cls()

def write(self, metrics: Metrics):
with self._lock:
try:
with tempfile.NamedTemporaryFile("w", delete=False, dir=os.path.dirname(PROGRESS_FILE)) as temp_file:
temp_file.write(self.json())
temp_path = temp_file.name
os.rename(temp_path, PROGRESS_FILE)
except Exception as ex: # pylint: disable=broad-except
LOG.exception("Failed to write persisted progress file: %r", ex)
metrics.unexpected_exception(ex, where="write_persisted_progress")

def get(self, key: str) -> ProgressData:
self.progress.setdefault(key, ProgressData())
return self.progress[key]

def reset(self, key: str, metrics: Metrics) -> None:
if key in self.progress and self.progress[key] != ProgressData():
del self.progress[key]
self.write(metrics=metrics)


def create_pgpass_file(connection_string_or_info):
"""Look up password from the given object which can be a dict or a
string and write a possible password in a pgpass file;
Expand Down
35 changes: 32 additions & 3 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from rohmu.typing import Metadata

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
Expand Down Expand Up @@ -147,14 +147,32 @@ def untrack_upload_event(self, file_key: str) -> None:

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
persisted_progress = PersistedProgress.read(metrics=self.metrics)

with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")

file_type = self._tracked_events[file_key].file_type
if file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
FileType.Basebackup,
FileType.Basebackup_chunk,
FileType.Basebackup_delta,
FileType.Basebackup_delta_chunk,
):
progress_info = persisted_progress.get(file_key)
if total_bytes_uploaded > progress_info.current_progress:
progress_info.update(total_bytes_uploaded)
persisted_progress.write(metrics=self.metrics)
self.metrics.gauge("pghoard.basebackup_stalled", 0)
else:
stalled_age = progress_info.age
self.metrics.gauge("basebackup_stalled", stalled_age)
self.log.warning(
"Upload for file %s has been stalled for %s seconds.",
file_key,
stalled_age,
)
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
Expand Down Expand Up @@ -410,6 +428,13 @@ def run_safe(self):
time.monotonic() - start_time
)

if file_to_transfer.operation in {TransferOperation.Upload} and filetype in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
if result.success:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")

Expand Down Expand Up @@ -513,6 +538,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):

# Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times
self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20))

if file_to_transfer.file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
self.transfer_queue.put(file_to_transfer)
return None
38 changes: 37 additions & 1 deletion test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
import os
from pathlib import Path
from typing import Any, Dict
from unittest.mock import patch

import pytest
from mock.mock import Mock
from rohmu.errors import Error

from pghoard import metrics
from pghoard.common import (
TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file,
TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file,
extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode,
pg_major_version, pg_version_string_to_number, write_json_file
)
Expand Down Expand Up @@ -88,6 +90,40 @@ def test_json_serialization(self, tmpdir):

assert ob2 == ob2_

def test_persisted_progress(self):
test_progress_file = "test_progress.json"

test_data = {
"progress": {
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": {
"current_progress": 100,
"last_updated_time": 1625072042.123456
}
}
}

with open(test_progress_file, "w") as file:
json.dump(test_data, file)

with patch("pghoard.common.PROGRESS_FILE", test_progress_file):
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == 100
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == 1625072042.123456

new_progress = 200
progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b")
progress_info.update(new_progress)
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == new_progress

os.remove(test_progress_file)


def test_pg_major_version():
assert pg_major_version("10") == "10"
Expand Down
20 changes: 19 additions & 1 deletion test/test_transferagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from rohmu.errors import FileNotFoundFromStorageError, StorageError

from pghoard import metrics
from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent
from pghoard.common import (PROGRESS_FILE, CallbackEvent, CallbackQueue, FileType, PersistedProgress, QuitEvent)
from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker)

# pylint: disable=attribute-defined-outside-init
Expand Down Expand Up @@ -316,3 +316,21 @@ def test_handle_metadata_error(self):
evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar")
assert evt.success is False
assert isinstance(evt.exception, Exception)

def test_handle_upload_with_persisted_progress(self):
upload_event = UploadEvent(
backup_site_name="test_site",
file_type=FileType.Basebackup,
file_path=Path(self.foo_basebackup_path),
source_data=Path(self.foo_basebackup_path),
metadata={},
file_size=3,
callback_queue=CallbackQueue(),
remove_after_upload=True
)

self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event)
updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3
if os.path.exists(PROGRESS_FILE):
os.remove(PROGRESS_FILE)

0 comments on commit 68ef95e

Please sign in to comment.