Skip to content

Commit

Permalink
Merge pull request #418 from aiven/alex-delta-backups
Browse files Browse the repository at this point in the history
Add support for delta basebackups [BF-6]

#418
  • Loading branch information
rikonen authored Apr 12, 2021
2 parents fbe66b0 + 366467e commit 0202a0b
Show file tree
Hide file tree
Showing 29 changed files with 1,661 additions and 130 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ __pycache__/
/venv/
/.venv/
*.orig
/pghoard-rpm-src/
4 changes: 4 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ disable=

[FORMAT]
max-line-length=125
max-module-lines=1000

[REPORTS]
output-format=text
reports=no

[TYPECHECK]
extension-pkg-whitelist=pydantic
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
short_ver = 2.1.1
short_ver = $(shell git describe --abbrev=0)
long_ver = $(shell git describe --long 2>/dev/null || echo $(short_ver)-0-unknown-g`git describe --always`)
generated = pghoard/version.py

Expand Down
2 changes: 1 addition & 1 deletion pghoard.spec
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ License: ASL 2.0
Source0: pghoard-rpm-src.tar
Requires: systemd
Requires: python3-botocore, python3-cryptography >= 0.8, python3-dateutil
Requires: python3-psycopg2, python3-requests, python3-snappy
Requires: python3-psycopg2, python3-requests, python3-snappy, python3-zstandard, python3-pydantic,
Conflicts: pgespresso92 < 1.2, pgespresso93 < 1.2, pgespresso94 < 1.2, pgespresso95 < 1.2
BuildRequires: python3-flake8, python3-pytest, python3-pylint, python3-devel, golang

Expand Down
169 changes: 121 additions & 48 deletions pghoard/basebackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from queue import Empty, Queue
from tempfile import NamedTemporaryFile
from threading import Thread
from typing import Optional

import psycopg2

Expand All @@ -25,8 +27,10 @@

# pylint: disable=superfluous-parens
from . import common, version, wal
from .basebackup_delta import DeltaBaseBackup
from .common import (
connection_string_using_pgpass, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass,
replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
from .patchedtarfile import tarfile
Expand All @@ -46,14 +50,38 @@
]


class BackupFailure(Exception):
"""Backup failed - post a failure to callback_queue and allow the thread to terminate"""


class NoException(BaseException):
"""Exception that's never raised, used in conditional except blocks"""


@dataclass(frozen=True)
class EncryptionData:
encryption_key_id: Optional[str]
rsa_public_key: Optional[str]

@staticmethod
def from_site_config(site_config) -> "EncryptionData":
encryption_key_id = site_config["encryption_key_id"]
if encryption_key_id:
rsa_public_key = site_config["encryption_keys"][encryption_key_id]["public"]
else:
rsa_public_key = None

return EncryptionData(encryption_key_id=encryption_key_id, rsa_public_key=rsa_public_key)


@dataclass(frozen=True)
class CompressionData:
algorithm: str
level: int

@staticmethod
def from_config(config) -> "CompressionData":
algorithm = config["compression"]["algorithm"]
level = config["compression"]["level"]
return CompressionData(algorithm=algorithm, level=level)


class PGBaseBackup(Thread):
def __init__(
self,
Expand All @@ -63,10 +91,12 @@ def __init__(
basebackup_path,
compression_queue,
metrics,
storage,
transfer_queue=None,
callback_queue=None,
pg_version_server=None,
metadata=None
metadata=None,
get_remote_basebackups_info=None
):
super().__init__()
self.log = logging.getLogger("PGBaseBackup")
Expand All @@ -84,15 +114,19 @@ def __init__(
self.pid = None
self.pg_version_server = pg_version_server
self.latest_activity = datetime.datetime.utcnow()
self.storage = storage
self.get_remote_basebackups_info = get_remote_basebackups_info

def run(self):
try:
basebackup_mode = self.config["backup_sites"][self.site]["basebackup_mode"]
if basebackup_mode == "basic":
basebackup_mode = self.site_config["basebackup_mode"]
if basebackup_mode == BaseBackupMode.basic:
self.run_basic_basebackup()
elif basebackup_mode == "local-tar":
elif basebackup_mode == BaseBackupMode.local_tar:
self.run_local_tar_basebackup()
elif basebackup_mode == "pipe":
elif basebackup_mode == BaseBackupMode.delta:
self.run_local_tar_basebackup(delta=True)
elif basebackup_mode == BaseBackupMode.pipe:
self.run_piped_basebackup()
else:
raise errors.InvalidConfigurationError("Unsupported basebackup_mode {!r}".format(basebackup_mode))
Expand Down Expand Up @@ -129,7 +163,7 @@ def get_paths_for_backup(basebackup_path):

def get_command_line(self, output_name):
command = [
self.config["backup_sites"][self.site]["pg_basebackup_path"],
self.site_config["pg_basebackup_path"],
"--format",
"tar",
"--label",
Expand All @@ -139,7 +173,7 @@ def get_command_line(self, output_name):
output_name,
]

if self.config["backup_sites"][self.site]["active_backup_mode"] == "standalone_hot_backup":
if self.site_config["active_backup_mode"] == "standalone_hot_backup":
if self.pg_version_server >= 100000:
command.extend(["--wal-method=fetch"])
else:
Expand Down Expand Up @@ -169,9 +203,9 @@ def check_command_success(self, proc, output_file):

def basebackup_compression_pipe(self, proc, basebackup_path):
rsa_public_key = None
encryption_key_id = self.config["backup_sites"][self.site]["encryption_key_id"]
encryption_key_id = self.site_config["encryption_key_id"]
if encryption_key_id:
rsa_public_key = self.config["backup_sites"][self.site]["encryption_keys"][encryption_key_id]["public"]
rsa_public_key = self.site_config["encryption_keys"][encryption_key_id]["public"]
compression_algorithm = self.config["compression"]["algorithm"]
compression_level = self.config["compression"]["level"]
self.log.debug("Compressing basebackup directly to file: %r", basebackup_path)
Expand Down Expand Up @@ -461,25 +495,30 @@ def add_entry(archive_path, local_path, *, missing_ok):
yield from add_directory(archive_path, local_path, missing_ok=False)
yield archive_path, local_path, False, "leave"

@property
def site_config(self):
return self.config["backup_sites"][self.site]

@property
def encryption_data(self) -> EncryptionData:
return EncryptionData.from_site_config(self.site_config)

@property
def compression_data(self) -> CompressionData:
return CompressionData.from_config(self.config)

def tar_one_file(
self, *, temp_dir, chunk_path, files_to_backup, callback_queue, filetype="basebackup_chunk", extra_metadata=None
):
start_time = time.monotonic()

site_config = self.config["backup_sites"][self.site]
encryption_key_id = site_config["encryption_key_id"]
if encryption_key_id:
rsa_public_key = site_config["encryption_keys"][encryption_key_id]["public"]
else:
rsa_public_key = None

with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj:
# pylint: disable=bad-continuation
with rohmufile.file_writer(
compression_algorithm=self.config["compression"]["algorithm"],
compression_level=self.config["compression"]["level"],
compression_threads=site_config["basebackup_compression_threads"],
rsa_public_key=rsa_public_key,
compression_algorithm=self.compression_data.algorithm,
compression_level=self.compression_data.level,
compression_threads=self.site_config["basebackup_compression_threads"],
rsa_public_key=self.encryption_data.rsa_public_key,
fileobj=raw_output_obj
) as output_obj:
with tarfile.TarFile(fileobj=output_obj, mode="w") as output_tar:
Expand All @@ -492,7 +531,7 @@ def tar_one_file(
os.link(raw_output_obj.name, chunk_path)

rohmufile.log_compression_result(
encrypted=bool(encryption_key_id),
encrypted=bool(self.encryption_data.encryption_key_id),
elapsed=time.monotonic() - start_time,
original_size=input_size,
result_size=result_size,
Expand All @@ -505,16 +544,16 @@ def tar_one_file(
"pghoard.compressed_size_ratio",
size_ratio,
tags={
"algorithm": self.config["compression"]["algorithm"],
"algorithm": self.compression_data.algorithm,
"site": self.site,
"type": "basebackup",
}
)

metadata = {
"compression-algorithm": self.config["compression"]["algorithm"],
"encryption-key-id": encryption_key_id,
"format": "pghoard-bb-v2",
"compression-algorithm": self.compression_data.algorithm,
"encryption-key-id": self.encryption_data.encryption_key_id,
"format": BaseBackupFormat.v2,
"original-file-size": input_size,
"host": socket.gethostname(),
}
Expand Down Expand Up @@ -573,9 +612,8 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):
self.chunks_on_disk = 0
i = 0

site_config = self.config["backup_sites"][self.site]
max_chunks_on_disk = site_config["basebackup_chunks_in_progress"]
threads = site_config["basebackup_threads"]
max_chunks_on_disk = self.site_config["basebackup_chunks_in_progress"]
threads = self.site_config["basebackup_threads"]
with ThreadPoolExecutor(max_workers=threads) as tpe:
pending_compress_and_encrypt_tasks = []
while i < len(chunks):
Expand Down Expand Up @@ -612,8 +650,9 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):

return chunk_files

def run_local_tar_basebackup(self):
pgdata = self.config["backup_sites"][self.site]["pg_data_directory"]
def run_local_tar_basebackup(self, delta=False):
control_files_metadata_extra = {}
pgdata = self.site_config["pg_data_directory"]
if not os.path.isdir(pgdata):
raise errors.InvalidConfigurationError("pg_data_directory {!r} does not exist".format(pgdata))

Expand All @@ -622,7 +661,7 @@ def run_local_tar_basebackup(self):
data_file_format = "{}/{}.{{0:08d}}.pghoard".format(compressed_base, os.path.basename(compressed_base)).format

# Default to 2GB chunks of uncompressed data
target_chunk_size = self.config["backup_sites"][self.site]["basebackup_chunk_size"]
target_chunk_size = self.site_config["basebackup_chunk_size"]

self.log.debug("Connecting to database to start backup process")
connection_string = connection_string_using_pgpass(self.connection_info)
Expand Down Expand Up @@ -686,13 +725,45 @@ def run_local_tar_basebackup(self):
self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base)
start_time = time.monotonic()

total_file_count, chunks = self.find_and_split_files_to_backup(
pgdata=pgdata, tablespaces=tablespaces, target_chunk_size=target_chunk_size
)
if delta:
delta_backup = DeltaBaseBackup(
storage=self.storage,
site=self.site,
site_config=self.site_config,
transfer_queue=self.transfer_queue,
metrics=self.metrics,
encryption_data=self.encryption_data,
compression_data=self.compression_data,
get_remote_basebackups_info=self.get_remote_basebackups_info,
parallel=self.site_config["basebackup_threads"],
temp_base_dir=temp_base_dir,
compressed_base=compressed_base
)
total_size_plain, total_size_enc, manifest, total_file_count = delta_backup.run(
pgdata=pgdata,
src_iterate_func=lambda: (
item[1]
for item in self.find_files_to_backup(pgdata=pgdata, tablespaces=tablespaces)
if not item[1].endswith(".pem") # Exclude such files like "dh1024.pem"
),
)

chunks_count = total_file_count
control_files_metadata_extra["manifest"] = manifest.jsondict()
self.metadata["format"] = BaseBackupFormat.delta_v1
else:
total_file_count, chunks = self.find_and_split_files_to_backup(
pgdata=pgdata, tablespaces=tablespaces, target_chunk_size=target_chunk_size
)
chunks_count = len(chunks)
# Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
# is reserved for special files and metadata and will be generated last.
chunk_files = self.create_and_upload_chunks(chunks, data_file_format, temp_base_dir)

# Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
# is reserved for special files and metadata and will be generated last.
chunk_files = self.create_and_upload_chunks(chunks, data_file_format, temp_base_dir)
total_size_plain = sum(item["input_size"] for item in chunk_files)
total_size_enc = sum(item["result_size"] for item in chunk_files)

control_files_metadata_extra["chunks"] = chunk_files

# Everything is now tarred up, grab the latest pg_control and stop the backup process
with open(os.path.join(pgdata, "global", "pg_control"), "rb") as fp:
Expand All @@ -709,14 +780,16 @@ def run_local_tar_basebackup(self):
db_conn.commit()
backup_stopped = True

total_size_plain = sum(item["input_size"] for item in chunk_files)
total_size_enc = sum(item["result_size"] for item in chunk_files)
backup_time = time.monotonic() - start_time
self.metrics.gauge(
"pghoard.backup_time_{}".format(self.site_config["basebackup_mode"]),
backup_time,
)

self.log.info(
"Basebackup generation finished, %r files, %r chunks, "
"%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, len(chunk_files),
total_size_plain, total_size_enc,
time.monotonic() - start_time
"%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, chunks_count,
total_size_plain, total_size_enc, backup_time
)

finally:
Expand All @@ -740,13 +813,13 @@ def run_local_tar_basebackup(self):
"backup_end_wal_segment": backup_end_wal_segment,
"backup_start_time": backup_start_time,
"backup_start_wal_segment": backup_start_wal_segment,
"chunks": chunk_files,
"pgdata": pgdata,
"pghoard_object": "basebackup",
"pghoard_version": version.__version__,
"tablespaces": tablespaces,
"host": socket.gethostname(),
}
metadata.update(control_files_metadata_extra)
control_files = list(
self.get_control_entries_for_tar(
metadata=metadata,
Expand Down
Loading

0 comments on commit 0202a0b

Please sign in to comment.