Skip to content

Commit

Permalink
Merge pull request #605 from Aiven-Open/track_wal_file_upload_progress
Browse files Browse the repository at this point in the history
upload event tracker
  • Loading branch information
alexole authored Jan 23, 2024
2 parents ad8bc84 + f64ffb5 commit bcf1099
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Vagrant.configure("2") do |config|
sed -i "s/^#create_main_cluster.*/create_main_cluster=false/g" /etc/postgresql-common/createcluster.conf
apt-get install -y python{3.8,3.9,3.10} python{3.8,3.9,3.10}-dev python{3.8,3.9,3.10}-venv
apt-get install -y postgresql-{10,11,12,13,14} postgresql-server-dev-{10,11,12,13,14}
apt-get install -y postgresql-{11,12,13,14,15,16} postgresql-server-dev-{11,12,13,14,15,16}
username="$(< /dev/urandom tr -dc a-z | head -c${1:-32};echo;)"
password=$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c${1:-32};echo;)
Expand Down
6 changes: 0 additions & 6 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,13 @@ def run_piped_basebackup(self):
})
metadata.update(self.metadata)

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": False})

self.transfer_queue.put(
UploadEvent(
file_type=FileType.Basebackup,
backup_site_name=self.site,
file_path=basebackup_path,
callback_queue=self.callback_queue,
file_size=compressed_file_size,
incremental_progress_callback=callback,
source_data=stream_target,
remove_after_upload=True,
metadata=metadata
Expand Down Expand Up @@ -615,7 +611,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
data_file_format,
compressed_base,
delta_stats=delta_stats,
delta=delta,
file_type=FileType.Basebackup_chunk
)

Expand Down Expand Up @@ -725,7 +720,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
callback_queue=self.callback_queue,
chunk_path=Path(data_file_format(0)), # pylint: disable=too-many-format-args
temp_dir=compressed_base,
delta=delta,
files_to_backup=control_files,
file_type=FileType.Basebackup,
extra_metadata={
Expand Down
9 changes: 0 additions & 9 deletions pghoard/basebackup/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def tar_one_file(
chunk_path,
files_to_backup,
callback_queue: CallbackQueue,
delta: bool,
file_type: FileType = FileType.Basebackup_chunk,
extra_metadata: Optional[Dict[str, Any]] = None,
delta_stats: Optional[DeltaStats] = None
Expand Down Expand Up @@ -194,9 +193,6 @@ def tar_one_file(

middle_path, chunk_name = ChunkUploader.chunk_path_to_middle_path_name(Path(chunk_path), file_type)

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": delta})

self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
Expand All @@ -205,7 +201,6 @@ def callback(n_bytes: int) -> None:
file_path=middle_path / chunk_name,
source_data=chunk_path,
metadata=metadata,
incremental_progress_callback=callback,
backup_site_name=self.site,
)
)
Expand All @@ -221,15 +216,13 @@ def handle_single_chunk(
chunks,
index: int,
temp_dir: Path,
delta: bool,
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk
) -> Dict[str, Any]:
one_chunk_files = chunks[index]
chunk_name, input_size, result_size = self.tar_one_file(
callback_queue=chunk_callback_queue,
chunk_path=chunk_path,
delta=delta,
temp_dir=temp_dir,
files_to_backup=one_chunk_files,
delta_stats=delta_stats,
Expand Down Expand Up @@ -270,7 +263,6 @@ def create_and_upload_chunks(
chunks,
data_file_format: Callable[[int], str],
temp_base_dir: Path,
delta: bool,
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk,
chunks_max_progress: float = 100.0
Expand Down Expand Up @@ -307,7 +299,6 @@ def create_and_upload_chunks(
chunks=chunks,
index=i,
temp_dir=temp_base_dir,
delta=delta,
delta_stats=delta_stats,
file_type=file_type
)
Expand Down
5 changes: 0 additions & 5 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ def progress_callback(n_bytes: int = 1) -> None:

dest_path = Path("basebackup_delta") / result_digest

def callback(n_bytes: int) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True})

self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
Expand All @@ -205,7 +202,6 @@ def callback(n_bytes: int) -> None:
backup_site_name=self.site,
metadata=metadata,
file_path=dest_path,
incremental_progress_callback=callback,
source_data=chunk_path
)
)
Expand Down Expand Up @@ -370,7 +366,6 @@ def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[Uplo
chunk_files = self.chunk_uploader.create_and_upload_chunks(
chunks=delta_chunks,
data_file_format=self.data_file_format,
delta=True,
temp_base_dir=self.compressed_base,
file_type=FileType.Basebackup_delta_chunk,
chunks_max_progress=chunks_max_progress,
Expand Down
13 changes: 11 additions & 2 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation
)
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent
from pghoard.transfer import (TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker)
from pghoard.walreceiver import WALReceiver
from pghoard.webserver import WebServer

Expand Down Expand Up @@ -143,6 +143,9 @@ def __init__(self, config_path):
self.requested_basebackup_sites = set()
self.inotify_adapter = InotifyAdapter(self.compression_queue)
self.inotify = InotifyWatcher(self.inotify_adapter)

self.upload_tracker = UploadEventProgressTracker(metrics=self.metrics)

self.webserver = WebServer(
self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics
)
Expand All @@ -167,6 +170,7 @@ def __init__(self, config_path):
config=self.config,
mp_manager=self.mp_manager,
transfer_queue=self.transfer_queue,
upload_tracker=self.upload_tracker,
metrics=self.metrics,
shared_state_dict=self.transfer_agent_state
)
Expand Down Expand Up @@ -695,6 +699,7 @@ def startup_walk_for_missed_files(self):
def start_threads_on_startup(self):
# Startup threads
self.inotify.start()
self.upload_tracker.start()
self.webserver.start()
self.wal_file_deleter.start()
for compressor in self.compressors:
Expand Down Expand Up @@ -971,7 +976,11 @@ def load_config(self, _signal=None, _frame=None): # pylint: disable=unused-argu

def _get_all_threads(self):
all_threads = []
# on first config load webserver isn't initialized yet

# on first config load upload_tracker and webserver aren't initialized yet
if hasattr(self, "upload_tracker"):
all_threads.append(self.upload_tracker)

if hasattr(self, "webserver"):
all_threads.append(self.webserver)
all_threads.extend(self.basebackups.values())
Expand Down
Loading

0 comments on commit bcf1099

Please sign in to comment.