Skip to content

Commit

Permalink
monitor pg connection when uploading chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
kathia-barahona committed Nov 19, 2024
1 parent 5e22c85 commit 4f5211c
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 18 deletions.
12 changes: 8 additions & 4 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
from pghoard.compressor import CompressionEvent
from pghoard.pgutil import check_if_pg_connection_is_alive
from pghoard.transfer import UploadEvent

BASEBACKUP_NAME = "pghoard_base_backup"
Expand Down Expand Up @@ -543,6 +544,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
self.log.debug("Connecting to database to start backup process")
connection_string = connection_string_using_pgpass(self.connection_info)
with psycopg2.connect(connection_string) as db_conn:
conn_polling = lambda: check_if_pg_connection_is_alive(db_conn)
cursor = db_conn.cursor()

if self.pg_version_server < 90600:
Expand Down Expand Up @@ -589,6 +591,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
for item in self.find_files_to_backup(pgdata=pgdata, tablespaces=tablespaces)
if not item[1].endswith(".pem") # Exclude such files like "dh1024.pem"
),
conn_polling=conn_polling,
)
chunks_count = len(chunk_files)
control_files_metadata_extra["chunks"] = chunk_files
Expand All @@ -607,11 +610,12 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
# Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
# is reserved for special files and metadata and will be generated last.
chunk_files = self.chunk_uploader.create_and_upload_chunks(
chunks,
data_file_format,
compressed_base,
chunks=chunks,
data_file_format=data_file_format,
temp_base_dir=compressed_base,
delta_stats=delta_stats,
file_type=FileType.Basebackup_chunk
file_type=FileType.Basebackup_chunk,
conn_polling=conn_polling,
)

total_size_plain = sum(item["input_size"] for item in chunk_files)
Expand Down
27 changes: 22 additions & 5 deletions pghoard/basebackup/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,16 @@ class DeltaStats:

class ChunkUploader:
def __init__(
self, metrics: Metrics, chunks_on_disk: int, encryption_data: EncryptionData, compression_data: CompressionData,
site_config: Dict[str, Any], site: str, is_running: Callable[[], bool], transfer_queue: TransferQueue
self,
*,
metrics: Metrics,
chunks_on_disk: int,
encryption_data: EncryptionData,
compression_data: CompressionData,
site_config: Dict[str, Any],
site: str,
is_running: Callable[[], bool],
transfer_queue: TransferQueue,
):
self.log = logging.getLogger("ChunkUploader")
self.metrics = metrics
Expand Down Expand Up @@ -216,9 +224,15 @@ def handle_single_chunk(
chunks,
index: int,
temp_dir: Path,
conn_polling: Callable[[], bool],
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk
file_type: FileType = FileType.Basebackup_chunk,
) -> Dict[str, Any]:
# if the chunk is dependent on a PG connection and connection
# is not alive then abort the task
if not conn_polling():
raise RuntimeError("ERROR: PostgreSQL connection was lost during backup process.")

one_chunk_files = chunks[index]
chunk_name, input_size, result_size = self.tar_one_file(
callback_queue=chunk_callback_queue,
Expand Down Expand Up @@ -260,12 +274,14 @@ def wait_for_chunk_transfer_to_complete(

def create_and_upload_chunks(
self,
*,
chunks,
data_file_format: Callable[[int], str],
conn_polling: Callable[[], bool],
temp_base_dir: Path,
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk,
chunks_max_progress: float = 100.0
chunks_max_progress: float = 100.0,
) -> List[Dict[str, Any]]:
start_time = time.monotonic()
chunk_files = []
Expand Down Expand Up @@ -299,8 +315,9 @@ def create_and_upload_chunks(
chunks=chunks,
index=i,
temp_dir=temp_base_dir,
conn_polling=conn_polling,
delta_stats=delta_stats,
file_type=file_type
file_type=file_type,
)
pending_compress_and_encrypt_tasks.append(task)
self.chunks_on_disk += 1
Expand Down
30 changes: 23 additions & 7 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,21 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol):

class DeltaBaseBackup:
def __init__(
self, *, storage: BaseTransfer, site: str, site_config: Dict[str, Any], transfer_queue: TransferQueue,
metrics: Metrics, encryption_data: EncryptionData, compression_data: CompressionData,
get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]], parallel: int, temp_base_dir: Path,
compressed_base: Path, chunk_uploader: ChunkUploader, data_file_format: Callable[[int], str]
self,
*,
storage: BaseTransfer,
site: str,
site_config: Dict[str, Any],
transfer_queue: TransferQueue,
metrics: Metrics,
encryption_data: EncryptionData,
compression_data: CompressionData,
get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]],
parallel: int,
temp_base_dir: Path,
compressed_base: Path,
chunk_uploader: ChunkUploader,
data_file_format: Callable[[int], str],
):
self.log = logging.getLogger("DeltaBaseBackup")
self.storage = storage
Expand Down Expand Up @@ -384,11 +395,14 @@ def _split_files_for_upload(

return delta_chunks, todo_hexdigests

def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]:
def _upload_chunks(
self, delta_chunks, chunks_max_progress: float, conn_polling: Callable[[], bool],
) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]:
"""Upload small files grouped into chunks to save on latency and requests costs"""
chunk_files = self.chunk_uploader.create_and_upload_chunks(
chunks=delta_chunks,
data_file_format=self.data_file_format,
conn_polling=conn_polling,
temp_base_dir=self.compressed_base,
file_type=FileType.Basebackup_delta_chunk,
chunks_max_progress=chunks_max_progress,
Expand Down Expand Up @@ -426,7 +440,7 @@ def _read_delta_sizes(self, snapshot_result: SnapshotResult) -> Tuple[UploadedFi
return digests_metric, embed_metric

def run(
self, pgdata: str, src_iterate_func: Callable[[], Iterable[BackupPath]]
self, pgdata: str, src_iterate_func: Callable[[], Iterable[BackupPath]], conn_polling: Callable[[], bool],
) -> Tuple[int, int, BackupManifest, int, List[Dict[str, Any]]]:
# NOTE: Hard links work only in the same FS, therefore using hopefully the same FS in PG home folder
delta_dir = os.path.join(os.path.dirname(pgdata), "basebackup_delta")
Expand Down Expand Up @@ -459,7 +473,9 @@ def run(
sum(len(chunk) for chunk in delta_chunks)
)
chunks_max_progress = delta_chunks_count * 100.0 / (delta_chunks_count + len(todo_hexdigests))
chunks_metric, chunk_files = self._upload_chunks(delta_chunks, chunks_max_progress=chunks_max_progress)
chunks_metric, chunk_files = self._upload_chunks(
delta_chunks, chunks_max_progress=chunks_max_progress, conn_polling=conn_polling,
)

self.log.info(
"Submitting hashes for upload: %r, total hashes in the snapshot: %r", len(todo_hexdigests),
Expand Down
15 changes: 13 additions & 2 deletions pghoard/pgutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
Copyright (c) 2015 Ohmu Ltd
See LICENSE for details
"""

from psycopg2 import OperationalError
from urllib.parse import parse_qs, urlparse


def create_connection_string(connection_info):
return " ".join("{}='{}'".format(k, str(v).replace("'", "\\'")) for k, v in sorted(connection_info.items()))

Expand Down Expand Up @@ -92,3 +91,15 @@ def parse_connection_string_libpq(connection_string):
value, connection_string = rem, ""
fields[key] = value
return fields


def check_if_pg_connection_is_alive(db_conn) -> bool:
alive = False
try:
with db_conn.cursor() as cursor:
cursor.execute("SELECT 1;")
alive = True
except OperationalError:
alive = False
finally:
return alive
25 changes: 25 additions & 0 deletions test/basebackup/test_basebackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,3 +946,28 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d
"7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e27": 8192,
"7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e28": 800
}

@pytest.mark.parametrize(
"backup_mode", [BaseBackupMode.local_tar, BaseBackupMode.delta, BaseBackupMode.local_tar_delta_stats],
)
def test_create_basebackup_lost_pg_connection(self, capsys, db, pghoard, tmpdir, backup_mode: BaseBackupMode):
with patch("pghoard.basebackup.base.check_if_pg_connection_is_alive", return_value=False):
pghoard.create_backup_site_paths(pghoard.test_site)
basebackup_path = os.path.join(pghoard.config["backup_location"], pghoard.test_site, "basebackup")
q = Queue()

pghoard.config["backup_sites"][pghoard.test_site]["basebackup_mode"] = backup_mode
pghoard.config["backup_sites"][pghoard.test_site]["active_backup_mode"] = "archive_command"

now = datetime.datetime.now(datetime.timezone.utc)
metadata = {
"backup-reason": BackupReason.scheduled,
"backup-decision-time": now.isoformat(),
"normalized-backup-time": now.isoformat(),
}
pghoard.create_basebackup(pghoard.test_site, db.user, basebackup_path, q, metadata)
result = q.get(timeout=60)

assert result.success is False
assert result.exception and isinstance(result.exception, RuntimeError)
assert result.exception.args[0] == 'ERROR: PostgreSQL connection was lost during backup process.'

0 comments on commit 4f5211c

Please sign in to comment.