Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upload event tracker #604

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,13 @@ def run_piped_basebackup(self):
})
metadata.update(self.metadata)

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": False})

self.transfer_queue.put(
UploadEvent(
file_type=FileType.Basebackup,
backup_site_name=self.site,
file_path=basebackup_path,
callback_queue=self.callback_queue,
file_size=compressed_file_size,
incremental_progress_callback=callback,
source_data=stream_target,
remove_after_upload=True,
metadata=metadata
Expand Down
4 changes: 0 additions & 4 deletions pghoard/basebackup/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ def tar_one_file(

middle_path, chunk_name = ChunkUploader.chunk_path_to_middle_path_name(Path(chunk_path), file_type)

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": delta})

self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
Expand All @@ -204,7 +201,6 @@ def callback(n_bytes: int) -> None:
file_path=middle_path / chunk_name,
source_data=chunk_path,
metadata=metadata,
incremental_progress_callback=callback,
backup_site_name=self.site,
)
)
Expand Down
1 change: 0 additions & 1 deletion pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ def callback(n_bytes: int) -> None:
backup_site_name=self.site,
metadata=metadata,
file_path=dest_path,
incremental_progress_callback=callback,
source_data=chunk_path
)
)
Expand Down
13 changes: 11 additions & 2 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation
)
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent
from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker
from pghoard.walreceiver import WALReceiver
from pghoard.webserver import WebServer

Expand Down Expand Up @@ -143,6 +143,9 @@ def __init__(self, config_path):
self.requested_basebackup_sites = set()
self.inotify_adapter = InotifyAdapter(self.compression_queue)
self.inotify = InotifyWatcher(self.inotify_adapter)

self.upload_tracker = UploadEventProgressTracker(metrics=self.metrics)

self.webserver = WebServer(
self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics
)
Expand All @@ -167,6 +170,7 @@ def __init__(self, config_path):
config=self.config,
mp_manager=self.mp_manager,
transfer_queue=self.transfer_queue,
upload_tracker=self.upload_tracker,
metrics=self.metrics,
shared_state_dict=self.transfer_agent_state
)
Expand Down Expand Up @@ -695,6 +699,7 @@ def startup_walk_for_missed_files(self):
def start_threads_on_startup(self):
# Startup threads
self.inotify.start()
self.upload_tracker.start()
self.webserver.start()
self.wal_file_deleter.start()
for compressor in self.compressors:
Expand Down Expand Up @@ -970,7 +975,11 @@ def load_config(self, _signal=None, _frame=None): # pylint: disable=unused-argu

def _get_all_threads(self):
all_threads = []
# on first config load webserver isn't initialized yet

# on first config load upload_tracker and webserver aren't initialized yet
if hasattr(self, "upload_tracker"):
all_threads.append(self.upload_tracker)

if hasattr(self, "webserver"):
all_threads.append(self.webserver)
all_threads.extend(self.basebackups.values())
Expand Down
174 changes: 170 additions & 4 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
import enum
import logging
import os
import threading
import time
from contextlib import suppress
from dataclasses import dataclass
from functools import partial
from io import BytesIO
from pathlib import Path
from queue import Empty
from threading import Lock
from typing import Any, BinaryIO, Dict, Optional, Union
from typing import Any, BinaryIO, Dict, Optional, Union, List

from rohmu import get_transfer
from rohmu.errors import FileNotFoundFromStorageError
Expand All @@ -26,6 +28,7 @@
get_object_storage_config
)
from pghoard.fetcher import FileFetchManager
from pghoard.metrics import Metrics

_STATS_LOCK = Lock()
_last_stats_transmit_time = 0
Expand Down Expand Up @@ -54,7 +57,6 @@ class UploadEvent(BaseTransferEvent):
file_size: Optional[int]
remove_after_upload: bool = True
retry_number: int = 0
incremental_progress_callback: Optional[IncrementalProgressCallbackType] = None

@property
def operation(self):
Expand Down Expand Up @@ -99,15 +101,167 @@ def operation(self):
TransferQueue = Queue


@dataclass
class TransferIncrement:
n_bytes: float
tracked_at: float = dataclasses.field(default_factory=time.monotonic)


@dataclass
class UploadEventProgress:
key: str
file_size: Optional[int]
file_type: FileType
increments: List[TransferIncrement] = dataclasses.field(default_factory=list)
started_at: float = dataclasses.field(default_factory=time.monotonic)

def is_completed(self) -> bool:
# UploadEvents without file_size are allowed, therefore we cannot determine if the event is completed
if not self.file_size:
return False
Comment on lines +119 to +121
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question Not sure why do we allow this though


total_nbytes = sum(increment.n_bytes for increment in self.increments)
return total_nbytes >= self.file_size


class UploadEventProgressTracker(PGHoardThread):
CHECK_FREQUENCY: int = 5 # check every 5 seconds for progress
WARNING_TIMEOUT: int = 5 * 60 # log a warning in case there is no progress during last 5 minutes

def __init__(
self,
*,
metrics: Metrics,
tags: Optional[Dict[str, Any]] = None,
) -> None:
self.metrics = metrics
self.log = logging.getLogger("UploadEventProgressTracker")
self.tags = tags

self.running: bool = False

self._tracked_events: Dict[str, UploadEventProgress] = {}
self._tracked_events_lock = threading.Lock()
self.log.debug("UploadEventProgressTracker initialized")

super().__init__()

def track_upload_event(self, file_key: str, file_type: FileType, file_size: Optional[int]) -> None:
with self._tracked_events_lock:
self.log.info(f"Tracking upload event for file {file_key}")
self._tracked_events[file_key] = UploadEventProgress(
key=file_key,
file_type=file_type,
file_size=file_size
)

def untrack_upload_event(self, file_key: str) -> None:
if file_key not in self._tracked_events:
return

with self._tracked_events_lock:
self._tracked_events.pop(file_key)

def increment(self, file_key: str, n_bytes: float) -> None:
with self._tracked_events_lock:
if file_key not in self._tracked_events:
raise Exception(f"UploadEvent for {file_key} is not being tracked.")
Comment on lines +167 to +168
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment probably need to raise something like UntrackedUploadEventError, otherwise this will stop the thread


if self._tracked_events[file_key].file_type in (
FileType.Basebackup,
FileType.Basebackup_chunk,
FileType.Basebackup_delta,
FileType.Basebackup_delta_chunk
):
self.metrics.increase(
"pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True},
)
elif self._tracked_events[file_key].file_type in (FileType.Wal, FileType.Timeline):
self.metrics.increase("pghoard.compressed_file_upload", inc_value=n_bytes, tags=self.tags)

self._tracked_events[file_key].increments.append(TransferIncrement(n_bytes=n_bytes))

# if the file is fully upload, then stop tracking it
if self._tracked_events[file_key].is_completed():
self._tracked_events.pop(file_key)

def reset(self) -> None:
with self._tracked_events_lock:
self._tracked_events = {}
self.running = False

def run_safe(self):
try:
self.running = True

while self.running:
with self._tracked_events_lock:
self._check_increment_rate()

time.sleep(self.CHECK_FREQUENCY)
except Exception: # pylint: disable=broad-except
self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload")
self.metrics.increase("pghoard.transfer_operation.errors")
self.reset()
self.stop()

self.log.debug("Quitting UploadEventProgressTracker")

def stop(self) -> None:
self.running = False

def _check_increment_rate(self) -> None:
"""
Check if the transfer operation is progressing by comparing the time elapsed since
last increment with the average time it took for previous increments. If the operation has been inactive,
a warning will be logged.
"""
now = time.monotonic()
for ue_progress in self._tracked_events.values():
if ue_progress.is_completed():
continue

last_increment_at = ue_progress.started_at
avg_rate = 0.

if ue_progress.increments:
# total "waiting" time between all increments
total_increment_diff = sum(
next_inc.tracked_at - prev_inc.tracked_at
for prev_inc, next_inc in zip(ue_progress.increments, ue_progress.increments[1:])
)
if len(ue_progress.increments) > 1:
avg_rate = total_increment_diff / (len(ue_progress.increments) - 1)
last_increment_at = ue_progress.increments[-1].tracked_at

# log warning in case we have not tracked any progress for the operation since
# the last check
if last_increment_at and (now - last_increment_at) >= avg_rate + self.WARNING_TIMEOUT:
self.log.warning(
"Upload for file %s has been inactive since %s seconds.",
ue_progress.file_size,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment oops should be ue_progress.key instead

now - last_increment_at
)


class TransferAgent(PGHoardThread):
def __init__(self, config, mp_manager, transfer_queue: TransferQueue, metrics, shared_state_dict):
def __init__(
self,
config,
mp_manager,
transfer_queue: TransferQueue,
upload_tracker: UploadEventProgressTracker,
metrics,
shared_state_dict,
):
super().__init__()
self.log = logging.getLogger("TransferAgent")
self.config = config
self.metrics = metrics
self.mp_manager = mp_manager
self.fetch_manager = FileFetchManager(self.config, self.mp_manager, self.get_object_storage)
self.transfer_queue = transfer_queue
self.upload_tracker = upload_tracker
self.running = True
self.sleep = time.sleep
self.state = shared_state_dict
Expand Down Expand Up @@ -311,6 +465,7 @@ def handle_download(self, site, key, file_to_transfer):
return CallbackEvent(success=False, exception=ex, opaque=file_to_transfer.opaque)

def handle_upload(self, site, key, file_to_transfer: UploadEvent):
self.log.info("me caga la puta")
payload = {"file_size": file_to_transfer.file_size}
try:
storage = self.get_object_storage(site)
Expand All @@ -324,9 +479,20 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):
metadata = file_to_transfer.metadata.copy()
if file_to_transfer.file_size:
metadata["Content-Length"] = str(file_to_transfer.file_size)
self.upload_tracker.track_upload_event(
file_key=key,
file_type=file_to_transfer.file_type,
file_size=file_to_transfer.file_size,
)
upload_progress_fn = partial(self.upload_tracker.increment, file_key=key)
storage.store_file_object(
key, f, metadata=metadata, upload_progress_fn=file_to_transfer.incremental_progress_callback
key,
f,
metadata=metadata,
upload_progress_fn=lambda n_bytes: upload_progress_fn(n_bytes=n_bytes),
)
# make sure we untrack it manually
self.upload_tracker.untrack_upload_event(key)
if unlink_local:
if isinstance(file_to_transfer.source_data, Path):
try:
Expand Down
Loading
Loading