diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 76c97cb1..481526ab 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -316,9 +316,6 @@ def run_piped_basebackup(self): }) metadata.update(self.metadata) - def callback(n_bytes: int) -> None: - self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": False}) - self.transfer_queue.put( UploadEvent( file_type=FileType.Basebackup, @@ -326,7 +323,6 @@ def callback(n_bytes: int) -> None: file_path=basebackup_path, callback_queue=self.callback_queue, file_size=compressed_file_size, - incremental_progress_callback=callback, source_data=stream_target, remove_after_upload=True, metadata=metadata @@ -615,7 +611,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = data_file_format, compressed_base, delta_stats=delta_stats, - delta=delta, file_type=FileType.Basebackup_chunk ) @@ -725,7 +720,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = callback_queue=self.callback_queue, chunk_path=Path(data_file_format(0)), # pylint: disable=too-many-format-args temp_dir=compressed_base, - delta=delta, files_to_backup=control_files, file_type=FileType.Basebackup, extra_metadata={ diff --git a/pghoard/basebackup/chunks.py b/pghoard/basebackup/chunks.py index 575fd723..acf5a704 100644 --- a/pghoard/basebackup/chunks.py +++ b/pghoard/basebackup/chunks.py @@ -136,7 +136,6 @@ def tar_one_file( chunk_path, files_to_backup, callback_queue: CallbackQueue, - delta: bool, file_type: FileType = FileType.Basebackup_chunk, extra_metadata: Optional[Dict[str, Any]] = None, delta_stats: Optional[DeltaStats] = None @@ -193,9 +192,6 @@ def tar_one_file( middle_path, chunk_name = ChunkUploader.chunk_path_to_middle_path_name(Path(chunk_path), file_type) - def callback(n_bytes: int) -> None: - self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": delta}) - self.transfer_queue.put( UploadEvent( callback_queue=callback_queue, @@ -204,7 +200,6 @@ def callback(n_bytes: int) -> None: file_path=middle_path / chunk_name, source_data=chunk_path, metadata=metadata, - incremental_progress_callback=callback, backup_site_name=self.site, ) ) @@ -220,7 +215,6 @@ def handle_single_chunk( chunks, index: int, temp_dir: Path, - delta: bool, delta_stats: Optional[DeltaStats] = None, file_type: FileType = FileType.Basebackup_chunk ) -> Dict[str, Any]: @@ -228,7 +222,6 @@ def handle_single_chunk( chunk_name, input_size, result_size = self.tar_one_file( callback_queue=chunk_callback_queue, chunk_path=chunk_path, - delta=delta, temp_dir=temp_dir, files_to_backup=one_chunk_files, delta_stats=delta_stats, @@ -269,7 +262,6 @@ def create_and_upload_chunks( chunks, data_file_format: Callable[[int], str], temp_base_dir: Path, - delta: bool, delta_stats: Optional[DeltaStats] = None, file_type: FileType = FileType.Basebackup_chunk, chunks_max_progress: float = 100.0 @@ -306,7 +298,6 @@ def create_and_upload_chunks( chunks=chunks, index=i, temp_dir=temp_base_dir, - delta=delta, delta_stats=delta_stats, file_type=file_type ) diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index edb3b353..3acdc488 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -190,9 +190,6 @@ def progress_callback(n_bytes: int = 1) -> None: dest_path = Path("basebackup_delta") / result_digest - def callback(n_bytes: int) -> None: - self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True}) - self.transfer_queue.put( UploadEvent( callback_queue=callback_queue, @@ -201,7 +198,6 @@ def callback(n_bytes: int) -> None: backup_site_name=self.site, metadata=metadata, file_path=dest_path, - incremental_progress_callback=callback, source_data=chunk_path ) ) @@ -363,7 +359,6 @@ def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[Uplo chunk_files = self.chunk_uploader.create_and_upload_chunks( chunks=delta_chunks, data_file_format=self.data_file_format, - delta=True, temp_base_dir=self.compressed_base, file_type=FileType.Basebackup_delta_chunk, chunks_max_progress=chunks_max_progress, diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index a2a04a3b..396ecda1 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -45,7 +45,7 @@ is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation ) from pghoard.receivexlog import PGReceiveXLog -from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent +from pghoard.transfer import (TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker) from pghoard.walreceiver import WALReceiver from pghoard.webserver import WebServer @@ -143,6 +143,9 @@ def __init__(self, config_path): self.requested_basebackup_sites = set() self.inotify_adapter = InotifyAdapter(self.compression_queue) self.inotify = InotifyWatcher(self.inotify_adapter) + + self.upload_tracker = UploadEventProgressTracker(metrics=self.metrics) + self.webserver = WebServer( self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics ) @@ -167,6 +170,7 @@ def __init__(self, config_path): config=self.config, mp_manager=self.mp_manager, transfer_queue=self.transfer_queue, + upload_tracker=self.upload_tracker, metrics=self.metrics, shared_state_dict=self.transfer_agent_state ) @@ -695,6 +699,7 @@ def startup_walk_for_missed_files(self): def start_threads_on_startup(self): # Startup threads self.inotify.start() + self.upload_tracker.start() self.webserver.start() self.wal_file_deleter.start() for compressor in self.compressors: @@ -971,7 +976,11 @@ def load_config(self, _signal=None, _frame=None): # pylint: disable=unused-argu def _get_all_threads(self): all_threads = [] - # on first config load webserver isn't initialized yet + + # on first config load upload_tracker and webserver aren't initialized yet + if hasattr(self, "upload_tracker"): + all_threads.append(self.upload_tracker) + if hasattr(self, "webserver"): all_threads.append(self.webserver) all_threads.extend(self.basebackups.values()) diff --git a/pghoard/transfer.py b/pghoard/transfer.py index c1510480..4efd1658 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -8,24 +8,27 @@ import enum import logging import os +import threading import time from contextlib import suppress from dataclasses import dataclass +from functools import partial from io import BytesIO from pathlib import Path from queue import Empty from threading import Lock -from typing import Any, BinaryIO, Dict, Optional, Union +from typing import Any, BinaryIO, Dict, List, Optional, Union from rohmu import get_transfer from rohmu.errors import FileNotFoundFromStorageError -from rohmu.object_storage.base import (BaseTransfer, IncrementalProgressCallbackType) +from rohmu.object_storage.base import BaseTransfer from pghoard.common import ( CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, get_object_storage_config ) from pghoard.fetcher import FileFetchManager +from pghoard.metrics import Metrics _STATS_LOCK = Lock() _last_stats_transmit_time = 0 @@ -54,7 +57,6 @@ class UploadEvent(BaseTransferEvent): file_size: Optional[int] remove_after_upload: bool = True retry_number: int = 0 - incremental_progress_callback: Optional[IncrementalProgressCallbackType] = None @property def operation(self): @@ -99,8 +101,148 @@ def operation(self): TransferQueue = Queue +@dataclass +class TransferIncrement: + n_bytes: float + tracked_at: float = dataclasses.field(default_factory=time.monotonic) + + +@dataclass +class UploadEventProgress: + key: str + file_size: Optional[int] + file_type: FileType + increments: List[TransferIncrement] = dataclasses.field(default_factory=list) + started_at: float = dataclasses.field(default_factory=time.monotonic) + + def is_completed(self) -> bool: + # UploadEvents without file_size are allowed, therefore we cannot determine if the event is completed + if not self.file_size: + return False + + total_nbytes = sum(increment.n_bytes for increment in self.increments) + return total_nbytes >= self.file_size + + +class UploadEventProgressTracker(PGHoardThread): + CHECK_FREQUENCY = 5. # check every 5 seconds for progress + WARNING_TIMEOUT = 5. * 60 # log a warning in case there is no progress during last 5 minutes + + def __init__(self, metrics: Metrics) -> None: + self.metrics = metrics + self.log = logging.getLogger("UploadEventProgressTracker") + + self.running: bool = False + + self._tracked_events: Dict[str, UploadEventProgress] = {} + self._tracked_events_lock = threading.Lock() + self.log.debug("UploadEventProgressTracker initialized") + + super().__init__() + + def track_upload_event(self, file_key: str, file_type: FileType, file_size: Optional[int]) -> None: + with self._tracked_events_lock: + self.log.info("Tracking upload event for file %s", file_key) + self._tracked_events[file_key] = UploadEventProgress(key=file_key, file_type=file_type, file_size=file_size) + + def untrack_upload_event(self, file_key: str) -> None: + if file_key not in self._tracked_events: + return + + with self._tracked_events_lock: + self._tracked_events.pop(file_key) + + def increment(self, file_key: str, n_bytes: float) -> None: + with self._tracked_events_lock: + if file_key not in self._tracked_events: + raise Exception(f"UploadEvent for {file_key} is not being tracked.") + + file_type = self._tracked_events[file_key].file_type + if file_type in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + delta = file_type in (FileType.Basebackup_delta, FileType.Basebackup_delta_chunk) + self.metrics.increase( + "pghoard.basebackup_bytes_uploaded", + inc_value=n_bytes, + tags={"delta": delta}, + ) + elif file_type in (FileType.Wal, FileType.Timeline): + self.metrics.increase("pghoard.compressed_file_upload", inc_value=n_bytes) + + self._tracked_events[file_key].increments.append(TransferIncrement(n_bytes=n_bytes)) + + # if the file is fully upload, then stop tracking it + if self._tracked_events[file_key].is_completed(): + self._tracked_events.pop(file_key) + + def reset(self) -> None: + with self._tracked_events_lock: + self._tracked_events = {} + self.running = False + + def run_safe(self): + try: + self.running = True + + while self.running: + with self._tracked_events_lock: + self._check_increment_rate() + + time.sleep(self.CHECK_FREQUENCY) + except Exception: # pylint: disable=broad-except + self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload") + self.metrics.increase("pghoard.transfer_operation.errors") + self.reset() + self.stop() + + self.log.debug("Quitting UploadEventProgressTracker") + + def stop(self) -> None: + self.running = False + + def _check_increment_rate(self) -> None: + """ + Check if the transfer operation is progressing by comparing the time elapsed since + last increment with the average time it took for previous increments. If the operation has been inactive, + a warning will be logged. + """ + now = time.monotonic() + for ue_progress in self._tracked_events.values(): + if ue_progress.is_completed(): + continue + + last_increment_at = ue_progress.started_at + avg_rate = 0. + + if ue_progress.increments: + # total "waiting" time between all increments + total_increment_diff = sum( + next_inc.tracked_at - prev_inc.tracked_at + for prev_inc, next_inc in zip(ue_progress.increments, ue_progress.increments[1:]) + ) + if len(ue_progress.increments) > 1: + avg_rate = total_increment_diff / (len(ue_progress.increments) - 1) + last_increment_at = ue_progress.increments[-1].tracked_at + + # log warning in case we have not tracked any progress for the operation since + # the last check + if last_increment_at and (now - last_increment_at) >= avg_rate + self.WARNING_TIMEOUT: + self.log.warning( + "Upload for file %s has been inactive since %s seconds.", ue_progress.key, now - last_increment_at + ) + + class TransferAgent(PGHoardThread): - def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, shared_state_dict): + def __init__( + self, + config, + mp_manager, + transfer_queue: TransferQueue, + upload_tracker: UploadEventProgressTracker, + metrics: Metrics, + shared_state_dict, + ): super().__init__() self.log = logging.getLogger("TransferAgent") self.config = config @@ -108,6 +250,7 @@ def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, s self.mp_manager = mp_manager self.fetch_manager = FileFetchManager(self.config, self.mp_manager, self.get_object_storage) self.transfer_queue = transfer_queue + self.upload_tracker = upload_tracker self.running = True self.sleep = time.sleep self.state = shared_state_dict @@ -324,9 +467,20 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): metadata = file_to_transfer.metadata.copy() if file_to_transfer.file_size: metadata["Content-Length"] = str(file_to_transfer.file_size) + self.upload_tracker.track_upload_event( + file_key=key, + file_type=file_to_transfer.file_type, + file_size=file_to_transfer.file_size, + ) + upload_progress_fn = partial(self.upload_tracker.increment, file_key=key) storage.store_file_object( - key, f, metadata=metadata, upload_progress_fn=file_to_transfer.incremental_progress_callback + key, + f, + metadata=metadata, + upload_progress_fn=lambda n_bytes: upload_progress_fn(n_bytes=n_bytes), ) + # make sure we untrack it manually + self.upload_tracker.untrack_upload_event(key) if unlink_local: if isinstance(file_to_transfer.source_data, Path): try: diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 713bb550..703383fb 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -8,14 +8,15 @@ import time from pathlib import Path from queue import Empty, Queue -from unittest.mock import ANY, Mock, patch +from unittest.mock import Mock, patch import pytest +from _pytest.logging import LogCaptureFixture from rohmu.errors import FileNotFoundFromStorageError, StorageError from pghoard import metrics -from pghoard.common import CallbackEvent, FileType, QuitEvent -from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent) +from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent +from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker) # pylint: disable=attribute-defined-outside-init from .base import PGHoardTestCase @@ -40,6 +41,31 @@ def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimet raise StorageError("foo") +class MockStorageNetworkThrottle(MockStorage): + """ + Storage simulating network throttling when uploading file chunks to object storage. + """ + NUM_CHUNKS = 4 + INCREMENT_WAIT_PER_CHUNK = [0.1, 0.1, 1, 0.1] + + # pylint: disable=unused-argument + def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None): + file_size = int(metadata["Content-Length"]) if "Content-Length" in metadata else None + if not file_size: + return + + chunk_size = round(file_size / self.NUM_CHUNKS) + for chunk_num in range(self.NUM_CHUNKS): + time.sleep(self.INCREMENT_WAIT_PER_CHUNK[chunk_num]) + if upload_progress_fn: + upload_progress_fn(chunk_size) + + +class PatchedUploadEventProgressTracker(UploadEventProgressTracker): + CHECK_FREQUENCY = .2 + WARNING_TIMEOUT = .5 + + class TestTransferAgent(PGHoardTestCase): def setup_method(self, method): super().setup_method(method) @@ -66,16 +92,22 @@ def setup_method(self, method): self.compression_queue = Queue() self.transfer_queue = Queue() + self.upload_tracker = PatchedUploadEventProgressTracker(metrics=metrics.Metrics(statsd={})) + self.upload_tracker.start() + self.transfer_agent = TransferAgent( config=self.config, mp_manager=None, transfer_queue=self.transfer_queue, + upload_tracker=self.upload_tracker, metrics=metrics.Metrics(statsd={}), shared_state_dict={} ) self.transfer_agent.start() def teardown_method(self, method): + self.upload_tracker.stop() + self.upload_tracker.join() self.transfer_agent.running = False self.transfer_queue.put(QuitEvent) self.transfer_agent.join() @@ -85,7 +117,7 @@ def _inject_prefix(self, prefix): self.config["backup_sites"][self.test_site]["prefix"] = prefix def test_handle_download(self): - callback_queue = Queue() + callback_queue = CallbackQueue() # Check the local storage fails and returns correctly self.transfer_queue.put( DownloadEvent( @@ -122,7 +154,7 @@ def test_handle_download(self): storage.get_contents_to_string.assert_called_with(expected_key) def test_handle_upload_xlog(self): - callback_queue = Queue() + callback_queue = CallbackQueue() storage = Mock() self.transfer_agent.get_object_storage = lambda x: storage assert os.path.exists(self.foo_path) is True @@ -141,44 +173,42 @@ def test_handle_upload_xlog(self): assert callback_queue.get(timeout=1.0) == CallbackEvent(success=True, payload={"file_size": 3}) assert os.path.exists(self.foo_path) is True expected_key = os.path.join(self.test_site, "xlog/00000001000000000000000C") - storage.store_file_object.assert_called_with( - expected_key, - ANY, - metadata={ - "Content-Length": "3", - "start-wal-segment": "00000001000000000000000C" - }, - upload_progress_fn=None - ) + + assert storage.store_file_object.call_count == 1 + assert storage.store_file_object.call_args[0][0] == expected_key + assert storage.store_file_object.call_args[1]["metadata"] == { + "Content-Length": "3", + "start-wal-segment": "00000001000000000000000C" + } + # Now check that the prefix is used. self._inject_prefix("site_specific_prefix") self.transfer_queue.put( UploadEvent( callback_queue=callback_queue, file_type=FileType.Wal, - file_path=Path("xlog/00000001000000000000000C"), + file_path=Path("xlog/00000001000000000000000D"), file_size=3, remove_after_upload=True, source_data=Path(self.foo_path), - metadata={"start-wal-segment": "00000001000000000000000C"}, + metadata={"start-wal-segment": "00000001000000000000000D"}, backup_site_name=self.test_site ) ) assert callback_queue.get(timeout=1.0) == CallbackEvent(success=True, payload={"file_size": 3}) - expected_key = "site_specific_prefix/xlog/00000001000000000000000C" - storage.store_file_object.assert_called_with( - expected_key, - ANY, - metadata={ - "Content-Length": "3", - "start-wal-segment": "00000001000000000000000C" - }, - upload_progress_fn=None - ) + expected_key = "site_specific_prefix/xlog/00000001000000000000000D" + + assert storage.store_file_object.call_count == 2 + assert storage.store_file_object.call_args[0][0] == expected_key + assert storage.store_file_object.call_args[1]["metadata"] == { + "Content-Length": "3", + "start-wal-segment": "00000001000000000000000D" + } + assert os.path.exists(self.foo_path) is False def test_handle_upload_basebackup(self): - callback_queue = Queue() + callback_queue = CallbackQueue() storage = Mock() self.transfer_agent.get_object_storage = storage assert os.path.exists(self.foo_path) is True @@ -204,7 +234,7 @@ def sleep(sleep_amount): sleeps.append(sleep_amount) time.sleep(0.001) - callback_queue = Queue() + callback_queue = CallbackQueue() storage = MockStorageRaising() self.transfer_agent.sleep = sleep self.transfer_agent.get_object_storage = storage @@ -229,6 +259,30 @@ def sleep(sleep_amount): expected_sleeps = [0.5, 1, 2, 4, 8, 16, 20, 20] assert sleeps[:8] == expected_sleeps + def test_tracking_warning_upload_event(self, caplog: LogCaptureFixture) -> None: + callback_queue = CallbackQueue() + storage = MockStorageNetworkThrottle() + + self.transfer_agent.get_object_storage = lambda x: storage + assert os.path.exists(self.foo_path) is True + self.transfer_queue.put( + UploadEvent( + callback_queue=callback_queue, + file_type=FileType.Wal, + file_path=Path("xlog/00000001000000000000000C"), + file_size=100, + source_data=Path(self.foo_path), + remove_after_upload=True, + metadata={"start-wal-segment": "00000001000000000000000C"}, + backup_site_name=self.test_site + ) + ) + + assert callback_queue.get(timeout=2.0) == CallbackEvent(success=True, payload={"file_size": 100}) + assert any( + record for record in caplog.records if record.levelname == "WARNING" and "has been inactive" in record.message + ) + @pytest.mark.timeout(30) def test_unknown_operation_raises_exception(self): class DummyEvent(BaseTransferEvent):