Skip to content

Commit

Permalink
Merge pull request #477 from aiven/refactor_queues
Browse files Browse the repository at this point in the history
Refactor queue events.

#477
  • Loading branch information
kmichel-aiven authored Nov 3, 2021
2 parents 6b363cd + ba14476 commit 6a7a213
Show file tree
Hide file tree
Showing 17 changed files with 919 additions and 692 deletions.
157 changes: 95 additions & 62 deletions pghoard/basebackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,29 @@
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from queue import Empty, Queue
from tempfile import NamedTemporaryFile
from threading import Thread
from typing import Dict, Optional

import psycopg2

from pghoard.compressor import CompressionEvent
from pghoard.rohmu import dates, errors, rohmufile
from pghoard.rohmu.compat import suppress

# pylint: disable=superfluous-parens
from . import common, version, wal
from .basebackup_delta import DeltaBaseBackup
from .common import (
BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass, extract_pghoard_bb_v2_metadata,
replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, FileType, connection_string_using_pgpass,
extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
from .patchedtarfile import tarfile
from .rohmu.delta.common import EMBEDDED_FILE_SIZE
from .transfer import UploadEvent

BASEBACKUP_NAME = "pghoard_base_backup"
EMPTY_DIRS = [
Expand Down Expand Up @@ -164,25 +167,33 @@ def run(self):

if self.callback_queue:
# post a failure event
self.callback_queue.put({"success": False})
self.callback_queue.put(CallbackEvent(success=False, exception=ex))

finally:
self.running = False

@staticmethod
def get_paths_for_backup(basebackup_path):
def get_backup_path(self) -> Path:
"""
Build a unique backup path
FIXME: this should look at the object storage, not the local incoming
dir.
"""
i = 0
# FIXME: self.basebackup_path should be a path relative to the
# repo root, which we can then use to derive a path relative to the
# local directory. For now, take care of it here.
basebackup_path = Path(self.basebackup_path)
incoming_basebackup_path = Path(self.basebackup_path + "_incoming")
local_repo_root = basebackup_path.parent
relative_basebackup_dir = basebackup_path.relative_to(local_repo_root)
while True:
tsdir = "{}_{}".format(datetime.datetime.utcnow().strftime("%Y-%m-%d_%H-%M"), i)
raw_basebackup = os.path.join(basebackup_path + "_incoming", tsdir)
compressed_basebackup = os.path.join(basebackup_path, tsdir)
# The backup directory names need not to be a sequence, so we lean towards skipping over any
# partial or leftover progress below. Make sure we only return paths if we're able to create the
# raw_basebackup directory.
if not os.path.exists(raw_basebackup) and not os.path.exists(compressed_basebackup):
with suppress(FileExistsError):
os.makedirs(raw_basebackup)
return raw_basebackup, compressed_basebackup
tsfilename = "{}_{}".format(datetime.datetime.utcnow().strftime("%Y-%m-%d_%H-%M"), i)
basebackup_path = relative_basebackup_dir / tsfilename
local_basebackup_path = incoming_basebackup_path / tsfilename
if not local_basebackup_path.exists():
local_basebackup_path.mkdir(exist_ok=True)
return basebackup_path, local_basebackup_path
i += 1

def get_command_line(self, output_name):
Expand Down Expand Up @@ -241,7 +252,8 @@ def basebackup_compression_pipe(self, proc, basebackup_path):
"host": socket.gethostname(),
}

with NamedTemporaryFile(prefix=basebackup_path, suffix=".tmp-compress") as output_obj:
# FIXME: more str operations on paths
with NamedTemporaryFile(prefix=str(basebackup_path), suffix=".tmp-compress") as output_obj:

def extract_header_func(input_data):
# backup_label should always be first in the tar ball
Expand Down Expand Up @@ -293,16 +305,17 @@ def run_piped_basebackup(self):
connection_string, _ = replication_connection_string_and_slot_using_pgpass(self.connection_info)
start_wal_segment = wal.get_current_lsn_from_identify_system(connection_string).walfile_name

temp_basebackup_dir, compressed_basebackup = self.get_paths_for_backup(self.basebackup_path)
basebackup_path, local_basebackup_path = self.get_backup_path()
command = self.get_command_line("-")
self.log.debug("Starting to run: %r", command)
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
setattr(proc, "basebackup_start_time", time.monotonic())

self.pid = proc.pid
self.log.info("Started: %r, running as PID: %r, basebackup_location: %r", command, self.pid, compressed_basebackup)
self.log.info("Started: %r, running as PID: %r, basebackup_location: %r", command, self.pid, basebackup_path)

stream_target = os.path.join(temp_basebackup_dir, "data.tmp")
stream_target = local_basebackup_path / "data.tmp"
stream_target.parent.mkdir(parents=True, exist_ok=True)

# catch any os level exceptions such out of disk space, so that the underlying
# OS process gets properly cleaned up by check_command_success
Expand All @@ -317,7 +330,6 @@ def run_piped_basebackup(self):
self.metrics.unexpected_exception(e, where="PGBaseBackup")

self.check_command_success(proc, stream_target)
os.rename(stream_target, compressed_basebackup)

# Since we might not be able to parse the backup label we cheat with the start-wal-segment and
# start-time a bit. The start-wal-segment is the segment currently being written before
Expand All @@ -340,15 +352,18 @@ def run_piped_basebackup(self):
})
metadata.update(self.metadata)

self.transfer_queue.put({
"callback_queue": self.callback_queue,
"file_size": compressed_file_size,
"filetype": "basebackup",
"local_path": compressed_basebackup,
"metadata": metadata,
"site": self.site,
"type": "UPLOAD",
})
self.transfer_queue.put(
UploadEvent(
file_type=FileType.Basebackup,
backup_site_key=self.site,
file_path=basebackup_path,
callback_queue=self.callback_queue,
file_size=compressed_file_size,
source_data=stream_target,
remove_after_upload=True,
metadata=metadata
)
)

def parse_backup_label(self, backup_label_data):
if isinstance(backup_label_data, str):
Expand All @@ -369,9 +384,11 @@ def parse_backup_label_in_tar(self, basebackup_path):
return self.parse_backup_label(content)

def run_basic_basebackup(self):
basebackup_directory, _ = self.get_paths_for_backup(self.basebackup_path)
basebackup_tar_file = os.path.join(basebackup_directory, "base.tar")
command = self.get_command_line(basebackup_directory)
# FIXME: handling of path should be cleaner but keep it like this for
# now.
basebackup_path, local_basebackup_path = self.get_backup_path()
basebackup_tar_file = local_basebackup_path / "base.tar"
command = self.get_command_line(local_basebackup_path)

self.log.debug("Starting to run: %r", command)
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Expand All @@ -391,19 +408,22 @@ def run_basic_basebackup(self):
if proc.poll() is not None:
break
self.check_command_success(proc, basebackup_tar_file)

start_wal_segment, start_time = self.parse_backup_label_in_tar(basebackup_tar_file)
self.compression_queue.put({
"callback_queue": self.callback_queue,
"full_path": basebackup_tar_file,
"metadata": {
**self.metadata,
"start-time": start_time,
"start-wal-segment": start_wal_segment,
"active-backup-mode": self.site_config["active_backup_mode"],
},
"type": "CLOSE_WRITE",
})
self.compression_queue.put(
CompressionEvent(
callback_queue=self.callback_queue,
file_type=FileType.Basebackup,
file_path=basebackup_path,
source_data=basebackup_tar_file,
backup_site_key=self.site,
metadata={
**self.metadata,
"start-time": start_time,
"start-wal-segment": start_wal_segment,
"active-backup-mode": self.site_config["active_backup_mode"],
}
)
)

def get_control_entries_for_tar(self, *, metadata, pg_control, backup_label):
mtime = time.time()
Expand Down Expand Up @@ -551,7 +571,7 @@ def tar_one_file(
chunk_path,
files_to_backup,
callback_queue,
filetype="basebackup_chunk",
file_type=FileType.Basebackup_chunk,
extra_metadata=None,
delta_stats=None
):
Expand Down Expand Up @@ -604,19 +624,32 @@ def tar_one_file(
}
if extra_metadata:
metadata.update(extra_metadata)
self.transfer_queue.put({
"callback_queue": callback_queue,
"file_size": result_size,
"filetype": filetype,
"local_path": chunk_path,
"metadata": metadata,
"site": self.site,
"type": "UPLOAD",
})
# FIXME: handle the key computation before here
chunk_path = Path(chunk_path)
base_repo = chunk_path.parent.parent.parent
chunk_name = chunk_path.relative_to(base_repo)
if file_type == FileType.Basebackup_chunk:
middle_path = Path("basebackup_chunk")
elif file_type == FileType.Basebackup:
middle_path = Path("basebackup")
chunk_name = chunk_name.name
elif file_type == FileType.Basebackup_delta:
middle_path = Path("basebackup_delta")
chunk_name = chunk_name.name
self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
file_size=result_size,
file_type=file_type,
file_path=middle_path / chunk_name,
source_data=chunk_path,
metadata=metadata,
backup_site_key=self.site,
)
)

# Get the name of the chunk and the name of the parent directory (ie backup "name")
chunk_name = "/".join(chunk_path.split("/")[-2:])
return chunk_name, input_size, result_size
return str(chunk_name), input_size, result_size

def wait_for_chunk_transfer_to_complete(self, chunk_count, upload_results, chunk_callback_queue, start_time):
try:
Expand All @@ -638,6 +671,7 @@ def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index
temp_dir=temp_dir,
files_to_backup=one_chunk_files,
delta_stats=delta_stats,
file_type=FileType.Basebackup_chunk
)
self.log.info(
"Queued backup chunk %r for transfer, chunks on disk (including partial): %r, current: %r, total chunks: %r",
Expand Down Expand Up @@ -729,8 +763,7 @@ def run_local_tar_basebackup(self, delta=False, with_delta_stats=False):
if not os.path.isdir(pgdata):
raise errors.InvalidConfigurationError("pg_data_directory {!r} does not exist".format(pgdata))

temp_base_dir, compressed_base = self.get_paths_for_backup(self.basebackup_path)
os.makedirs(compressed_base)
_, compressed_base = self.get_backup_path()
data_file_format = "{}/{}.{{0:08d}}.pghoard".format(compressed_base, os.path.basename(compressed_base)).format

# Default to 2GB chunks of uncompressed data
Expand Down Expand Up @@ -809,7 +842,7 @@ def run_local_tar_basebackup(self, delta=False, with_delta_stats=False):
compression_data=self.compression_data,
get_remote_basebackups_info=self.get_remote_basebackups_info,
parallel=self.site_config["basebackup_threads"],
temp_base_dir=temp_base_dir,
temp_base_dir=compressed_base,
compressed_base=compressed_base
)
total_size_plain, total_size_enc, manifest, total_file_count = delta_backup.run(
Expand Down Expand Up @@ -837,7 +870,7 @@ def run_local_tar_basebackup(self, delta=False, with_delta_stats=False):
# Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
# is reserved for special files and metadata and will be generated last.
chunk_files = self.create_and_upload_chunks(
chunks, data_file_format, temp_base_dir, delta_stats=delta_stats
chunks, data_file_format, compressed_base, delta_stats=delta_stats
)

total_size_plain = sum(item["input_size"] for item in chunk_files)
Expand Down Expand Up @@ -943,9 +976,9 @@ def run_local_tar_basebackup(self, delta=False, with_delta_stats=False):
self.tar_one_file(
callback_queue=self.callback_queue,
chunk_path=data_file_format(0), # pylint: disable=too-many-format-args
temp_dir=temp_base_dir,
temp_dir=compressed_base,
files_to_backup=control_files,
filetype="basebackup",
file_type=FileType.Basebackup,
extra_metadata={
**self.metadata,
"end-time": backup_end_time,
Expand Down
29 changes: 16 additions & 13 deletions pghoard/basebackup_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
import uuid
from contextlib import suppress
from multiprocessing.dummy import Pool
from pathlib import Path
from queue import Empty, Queue
from tempfile import NamedTemporaryFile
from typing import Callable, Dict

from pghoard.common import (BackupFailure, BaseBackupFormat, extract_pghoard_delta_v1_metadata)
from pghoard.common import (BackupFailure, BaseBackupFormat, FileType, extract_pghoard_delta_v1_metadata)
from pghoard.rohmu import rohmufile
from pghoard.rohmu.dates import now
from pghoard.rohmu.delta.common import (BackupManifest, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult)
from pghoard.rohmu.delta.snapshot import Snapshotter
from pghoard.rohmu.errors import FileNotFoundFromStorageError
from pghoard.transfer import UploadEvent


class DeltaBaseBackup:
Expand Down Expand Up @@ -169,18 +171,19 @@ def update_hash(data):
"host": socket.gethostname(),
}

self.transfer_queue.put({
"callback_queue": callback_queue,
"file_size": result_size,
"filetype": "basebackup_delta",
"local_path": chunk_path,
"metadata": metadata,
"site": self.site,
"type": "UPLOAD",
"delta": {
"hexdigest": result_digest,
},
})
dest_path = Path("basebackup_delta") / result_digest

self.transfer_queue.put(
UploadEvent(
callback_queue=callback_queue,
file_size=result_size,
file_type=FileType.Basebackup_delta,
backup_site_key=self.site,
metadata=metadata,
file_path=dest_path,
source_data=chunk_path
)
)

return input_size, result_size, result_digest, skip_upload

Expand Down
Loading

0 comments on commit 6a7a213

Please sign in to comment.