From d4b01a181e20ee6cca8fe49d1677d57ad3cbf8b0 Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Wed, 4 Oct 2023 13:48:01 +0000 Subject: [PATCH] upload event tracker --- pghoard/basebackup/base.py | 4 - pghoard/basebackup/chunks.py | 4 - pghoard/basebackup/delta.py | 1 - pghoard/pghoard.py | 13 ++- pghoard/transfer.py | 167 ++++++++++++++++++++++++++++++++++- test/test_transferagent.py | 102 ++++++++++++++++----- 6 files changed, 254 insertions(+), 37 deletions(-) diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 76c97cb1..8aa635d1 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -316,9 +316,6 @@ def run_piped_basebackup(self): }) metadata.update(self.metadata) - def callback(n_bytes: int) -> None: - self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": False}) - self.transfer_queue.put( UploadEvent( file_type=FileType.Basebackup, @@ -326,7 +323,6 @@ def callback(n_bytes: int) -> None: file_path=basebackup_path, callback_queue=self.callback_queue, file_size=compressed_file_size, - incremental_progress_callback=callback, source_data=stream_target, remove_after_upload=True, metadata=metadata diff --git a/pghoard/basebackup/chunks.py b/pghoard/basebackup/chunks.py index 575fd723..48fdc1f3 100644 --- a/pghoard/basebackup/chunks.py +++ b/pghoard/basebackup/chunks.py @@ -193,9 +193,6 @@ def tar_one_file( middle_path, chunk_name = ChunkUploader.chunk_path_to_middle_path_name(Path(chunk_path), file_type) - def callback(n_bytes: int) -> None: - self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": delta}) - self.transfer_queue.put( UploadEvent( callback_queue=callback_queue, @@ -204,7 +201,6 @@ def callback(n_bytes: int) -> None: file_path=middle_path / chunk_name, source_data=chunk_path, metadata=metadata, - incremental_progress_callback=callback, backup_site_name=self.site, ) ) diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index edb3b353..7e181196 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -201,7 +201,6 @@ def callback(n_bytes: int) -> None: backup_site_name=self.site, metadata=metadata, file_path=dest_path, - incremental_progress_callback=callback, source_data=chunk_path ) ) diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index bc7a5cec..1d12eead 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -45,7 +45,7 @@ is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation ) from pghoard.receivexlog import PGReceiveXLog -from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent +from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker from pghoard.walreceiver import WALReceiver from pghoard.webserver import WebServer @@ -143,6 +143,9 @@ def __init__(self, config_path): self.requested_basebackup_sites = set() self.inotify_adapter = InotifyAdapter(self.compression_queue) self.inotify = InotifyWatcher(self.inotify_adapter) + + self.upload_tracker = UploadEventProgressTracker(metrics=self.metrics) + self.webserver = WebServer( self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics ) @@ -167,6 +170,7 @@ def __init__(self, config_path): config=self.config, mp_manager=self.mp_manager, transfer_queue=self.transfer_queue, + upload_tracker=self.upload_tracker, metrics=self.metrics, shared_state_dict=self.transfer_agent_state ) @@ -695,6 +699,7 @@ def startup_walk_for_missed_files(self): def start_threads_on_startup(self): # Startup threads self.inotify.start() + self.upload_tracker.start() self.webserver.start() self.wal_file_deleter.start() for compressor in self.compressors: @@ -970,7 +975,11 @@ def load_config(self, _signal=None, _frame=None): # pylint: disable=unused-argu def _get_all_threads(self): all_threads = [] - # on first config load webserver isn't initialized yet + + # on first config load upload_tracker and webserver aren't initialized yet + if hasattr(self, "upload_tracker"): + all_threads.append(self.upload_tracker) + if hasattr(self, "webserver"): all_threads.append(self.webserver) all_threads.extend(self.basebackups.values()) diff --git a/pghoard/transfer.py b/pghoard/transfer.py index c1510480..60d987c1 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -8,14 +8,16 @@ import enum import logging import os +import threading import time from contextlib import suppress from dataclasses import dataclass +from functools import partial from io import BytesIO from pathlib import Path from queue import Empty from threading import Lock -from typing import Any, BinaryIO, Dict, Optional, Union +from typing import Any, BinaryIO, Dict, Optional, Union, List from rohmu import get_transfer from rohmu.errors import FileNotFoundFromStorageError @@ -26,6 +28,7 @@ get_object_storage_config ) from pghoard.fetcher import FileFetchManager +from pghoard.metrics import Metrics _STATS_LOCK = Lock() _last_stats_transmit_time = 0 @@ -54,7 +57,6 @@ class UploadEvent(BaseTransferEvent): file_size: Optional[int] remove_after_upload: bool = True retry_number: int = 0 - incremental_progress_callback: Optional[IncrementalProgressCallbackType] = None @property def operation(self): @@ -99,8 +101,153 @@ def operation(self): TransferQueue = Queue +@dataclass +class TransferIncrement: + n_bytes: float + tracked_at: float = dataclasses.field(default_factory=time.monotonic) + + +@dataclass +class UploadEventProgress: + key: str + file_size: Optional[int] + file_type: FileType + increments: List[TransferIncrement] = dataclasses.field(default_factory=list) + started_at: float = dataclasses.field(default_factory=time.monotonic) + + def is_completed(self) -> bool: + # UploadEvents without file_size are allowed, therefore we cannot determine if the event is completed + if not self.file_size: + return False + + total_nbytes = sum(increment.n_bytes for increment in self.increments) + return total_nbytes >= self.file_size + + +class UploadEventProgressTracker(PGHoardThread): + CHECK_FREQUENCY: int = 5 # check every 5 seconds for progress + WARNING_TIMEOUT: int = 5 * 60 # log a warning in case there is no progress during last 5 minutes + + def __init__(self, metrics: Metrics) -> None: + self.metrics = metrics + self.log = logging.getLogger("UploadEventProgressTracker") + + self.running: bool = False + + self._tracked_events: Dict[str, UploadEventProgress] = {} + self._tracked_events_lock = threading.Lock() + self.log.debug("UploadEventProgressTracker initialized") + + super().__init__() + + def track_upload_event(self, file_key: str, file_type: FileType, file_size: Optional[int]) -> None: + with self._tracked_events_lock: + self.log.info(f"Tracking upload event for file {file_key}") + self._tracked_events[file_key] = UploadEventProgress( + key=file_key, + file_type=file_type, + file_size=file_size + ) + + def untrack_upload_event(self, file_key: str) -> None: + if file_key not in self._tracked_events: + return + + with self._tracked_events_lock: + self._tracked_events.pop(file_key) + + def increment(self, file_key: str, n_bytes: float) -> None: + with self._tracked_events_lock: + if file_key not in self._tracked_events: + raise Exception(f"UploadEvent for {file_key} is not being tracked.") + + if self._tracked_events[file_key].file_type in ( + FileType.Basebackup, + FileType.Basebackup_chunk, + FileType.Basebackup_delta, + FileType.Basebackup_delta_chunk + ): + self.metrics.increase( + "pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True}, + ) + elif self._tracked_events[file_key].file_type in (FileType.Wal, FileType.Timeline): + self.metrics.increase("pghoard.compressed_file_upload", inc_value=n_bytes, tags=self.tags) + + self._tracked_events[file_key].increments.append(TransferIncrement(n_bytes=n_bytes)) + + # if the file is fully upload, then stop tracking it + if self._tracked_events[file_key].is_completed(): + self._tracked_events.pop(file_key) + + def reset(self) -> None: + with self._tracked_events_lock: + self._tracked_events = {} + self.running = False + + def run_safe(self): + try: + self.running = True + + while self.running: + with self._tracked_events_lock: + self._check_increment_rate() + + time.sleep(self.CHECK_FREQUENCY) + except Exception: # pylint: disable=broad-except + self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload") + self.metrics.increase("pghoard.transfer_operation.errors") + self.reset() + self.stop() + + self.log.debug("Quitting UploadEventProgressTracker") + + def stop(self) -> None: + self.running = False + + def _check_increment_rate(self) -> None: + """ + Check if the transfer operation is progressing by comparing the time elapsed since + last increment with the average time it took for previous increments. If the operation has been inactive, + a warning will be logged. + """ + now = time.monotonic() + for ue_progress in self._tracked_events.values(): + if ue_progress.is_completed(): + continue + + last_increment_at = ue_progress.started_at + avg_rate = 0. + + if ue_progress.increments: + # total "waiting" time between all increments + total_increment_diff = sum( + next_inc.tracked_at - prev_inc.tracked_at + for prev_inc, next_inc in zip(ue_progress.increments, ue_progress.increments[1:]) + ) + if len(ue_progress.increments) > 1: + avg_rate = total_increment_diff / (len(ue_progress.increments) - 1) + last_increment_at = ue_progress.increments[-1].tracked_at + + # log warning in case we have not tracked any progress for the operation since + # the last check + if last_increment_at and (now - last_increment_at) >= avg_rate + self.WARNING_TIMEOUT: + self.log.warning( + "Upload for file %s has been inactive since %s seconds.", + ue_progress.key, + now - last_increment_at + ) + + class TransferAgent(PGHoardThread): - def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, shared_state_dict): + def __init__( + self, + config, + mp_manager, + transfer_queue: TransferQueue, + upload_tracker: UploadEventProgressTracker, + metrics: Metrics, + shared_state_dict, + ): super().__init__() self.log = logging.getLogger("TransferAgent") self.config = config @@ -108,6 +255,7 @@ def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, s self.mp_manager = mp_manager self.fetch_manager = FileFetchManager(self.config, self.mp_manager, self.get_object_storage) self.transfer_queue = transfer_queue + self.upload_tracker = upload_tracker self.running = True self.sleep = time.sleep self.state = shared_state_dict @@ -324,9 +472,20 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): metadata = file_to_transfer.metadata.copy() if file_to_transfer.file_size: metadata["Content-Length"] = str(file_to_transfer.file_size) + self.upload_tracker.track_upload_event( + file_key=key, + file_type=file_to_transfer.file_type, + file_size=file_to_transfer.file_size, + ) + upload_progress_fn = partial(self.upload_tracker.increment, file_key=key) storage.store_file_object( - key, f, metadata=metadata, upload_progress_fn=file_to_transfer.incremental_progress_callback + key, + f, + metadata=metadata, + upload_progress_fn=lambda n_bytes: upload_progress_fn(n_bytes=n_bytes), ) + # make sure we untrack it manually + self.upload_tracker.untrack_upload_event(key) if unlink_local: if isinstance(file_to_transfer.source_data, Path): try: diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 713bb550..872d0bdf 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -4,6 +4,7 @@ Copyright (c) 2015 Ohmu Ltd See LICENSE for details """ +import logging import os import time from pathlib import Path @@ -15,7 +16,7 @@ from pghoard import metrics from pghoard.common import CallbackEvent, FileType, QuitEvent -from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent) +from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker) # pylint: disable=attribute-defined-outside-init from .base import PGHoardTestCase @@ -40,6 +41,34 @@ def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimet raise StorageError("foo") +class MockStorageNetworkThrottle(MockStorage): + """ + Storage simulating network throttling when uploading files to object storage. + """ + NUM_CHUNKS = 4 + INCREMENT_WAIT_PER_CHUNK = [0.1, 0.1, 1, 0.1] + + def store_file_object( + self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None + ): + file_size = int(metadata["Content-Length"]) if "Content-Length" in metadata else None + if not file_size: + return + + chunk_size = round(file_size / self.NUM_CHUNKS) + for chunk_num in range(self.NUM_CHUNKS): + time.sleep(self.INCREMENT_WAIT_PER_CHUNK[chunk_num]) + if upload_progress_fn: + upload_progress_fn(chunk_size) + + print("holis") + + +class PatchedUploadEventProgressTracker(UploadEventProgressTracker): + CHECK_FREQUENCY = .2 + WARNING_TIMEOUT = .5 + + class TestTransferAgent(PGHoardTestCase): def setup_method(self, method): super().setup_method(method) @@ -66,16 +95,22 @@ def setup_method(self, method): self.compression_queue = Queue() self.transfer_queue = Queue() + self.upload_tracker = PatchedUploadEventProgressTracker(metrics=metrics.Metrics(statsd={})) + self.upload_tracker.start() + self.transfer_agent = TransferAgent( config=self.config, mp_manager=None, transfer_queue=self.transfer_queue, + upload_tracker=self.upload_tracker, metrics=metrics.Metrics(statsd={}), shared_state_dict={} ) self.transfer_agent.start() def teardown_method(self, method): + self.upload_tracker.stop() + self.upload_tracker.join() self.transfer_agent.running = False self.transfer_queue.put(QuitEvent) self.transfer_agent.join() @@ -141,40 +176,38 @@ def test_handle_upload_xlog(self): assert callback_queue.get(timeout=1.0) == CallbackEvent(success=True, payload={"file_size": 3}) assert os.path.exists(self.foo_path) is True expected_key = os.path.join(self.test_site, "xlog/00000001000000000000000C") - storage.store_file_object.assert_called_with( - expected_key, - ANY, - metadata={ - "Content-Length": "3", - "start-wal-segment": "00000001000000000000000C" - }, - upload_progress_fn=None - ) + + assert storage.store_file_object.call_count == 1 + assert storage.store_file_object.call_args[0][0] == expected_key + assert storage.store_file_object.call_args[1]["metadata"] == { + "Content-Length": "3", + "start-wal-segment": "00000001000000000000000C" + } + # Now check that the prefix is used. self._inject_prefix("site_specific_prefix") self.transfer_queue.put( UploadEvent( callback_queue=callback_queue, file_type=FileType.Wal, - file_path=Path("xlog/00000001000000000000000C"), + file_path=Path("xlog/00000001000000000000000D"), file_size=3, remove_after_upload=True, source_data=Path(self.foo_path), - metadata={"start-wal-segment": "00000001000000000000000C"}, + metadata={"start-wal-segment": "00000001000000000000000D"}, backup_site_name=self.test_site ) ) assert callback_queue.get(timeout=1.0) == CallbackEvent(success=True, payload={"file_size": 3}) - expected_key = "site_specific_prefix/xlog/00000001000000000000000C" - storage.store_file_object.assert_called_with( - expected_key, - ANY, - metadata={ - "Content-Length": "3", - "start-wal-segment": "00000001000000000000000C" - }, - upload_progress_fn=None - ) + expected_key = "site_specific_prefix/xlog/00000001000000000000000D" + + assert storage.store_file_object.call_count == 2 + assert storage.store_file_object.call_args[0][0] == expected_key + assert storage.store_file_object.call_args[1]["metadata"] == { + "Content-Length": "3", + "start-wal-segment": "00000001000000000000000D" + } + assert os.path.exists(self.foo_path) is False def test_handle_upload_basebackup(self): @@ -229,6 +262,31 @@ def sleep(sleep_amount): expected_sleeps = [0.5, 1, 2, 4, 8, 16, 20, 20] assert sleeps[:8] == expected_sleeps + def test_tracking_warning_upload_event(self, caplog) -> None: + callback_queue = Queue() + storage = MockStorageNetworkThrottle() + + self.transfer_agent.get_object_storage = lambda x: storage + assert os.path.exists(self.foo_path) is True + self.transfer_queue.put( + UploadEvent( + callback_queue=callback_queue, + file_type=FileType.Wal, + file_path=Path("xlog/00000001000000000000000C"), + file_size=100, + source_data=Path(self.foo_path), + remove_after_upload=True, + metadata={"start-wal-segment": "00000001000000000000000C"}, + backup_site_name=self.test_site + ) + ) + + assert callback_queue.get(timeout=2.0) == CallbackEvent(success=True, payload={"file_size": 100}) + assert any( + record for record in caplog.records + if record.levelname == "WARNING" and "has been inactive" in record.message + ) + @pytest.mark.timeout(30) def test_unknown_operation_raises_exception(self): class DummyEvent(BaseTransferEvent):