From 4f5211c9aa751fdb108670cc4253466865f76461 Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Tue, 19 Nov 2024 13:30:24 +0100 Subject: [PATCH] monitor pg connection when uploading chunks --- pghoard/basebackup/base.py | 12 ++++++++---- pghoard/basebackup/chunks.py | 27 ++++++++++++++++++++++----- pghoard/basebackup/delta.py | 30 +++++++++++++++++++++++------- pghoard/pgutil.py | 15 +++++++++++++-- test/basebackup/test_basebackup.py | 25 +++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 18 deletions(-) diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 260bd850..20089618 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -35,6 +35,7 @@ set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) from pghoard.compressor import CompressionEvent +from pghoard.pgutil import check_if_pg_connection_is_alive from pghoard.transfer import UploadEvent BASEBACKUP_NAME = "pghoard_base_backup" @@ -543,6 +544,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = self.log.debug("Connecting to database to start backup process") connection_string = connection_string_using_pgpass(self.connection_info) with psycopg2.connect(connection_string) as db_conn: + conn_polling = lambda: check_if_pg_connection_is_alive(db_conn) cursor = db_conn.cursor() if self.pg_version_server < 90600: @@ -589,6 +591,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = for item in self.find_files_to_backup(pgdata=pgdata, tablespaces=tablespaces) if not item[1].endswith(".pem") # Exclude such files like "dh1024.pem" ), + conn_polling=conn_polling, ) chunks_count = len(chunk_files) control_files_metadata_extra["chunks"] = chunk_files @@ -607,11 +610,12 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 # is reserved for special files and metadata and will be generated last. chunk_files = self.chunk_uploader.create_and_upload_chunks( - chunks, - data_file_format, - compressed_base, + chunks=chunks, + data_file_format=data_file_format, + temp_base_dir=compressed_base, delta_stats=delta_stats, - file_type=FileType.Basebackup_chunk + file_type=FileType.Basebackup_chunk, + conn_polling=conn_polling, ) total_size_plain = sum(item["input_size"] for item in chunk_files) diff --git a/pghoard/basebackup/chunks.py b/pghoard/basebackup/chunks.py index caa10d64..68213bfd 100644 --- a/pghoard/basebackup/chunks.py +++ b/pghoard/basebackup/chunks.py @@ -62,8 +62,16 @@ class DeltaStats: class ChunkUploader: def __init__( - self, metrics: Metrics, chunks_on_disk: int, encryption_data: EncryptionData, compression_data: CompressionData, - site_config: Dict[str, Any], site: str, is_running: Callable[[], bool], transfer_queue: TransferQueue + self, + *, + metrics: Metrics, + chunks_on_disk: int, + encryption_data: EncryptionData, + compression_data: CompressionData, + site_config: Dict[str, Any], + site: str, + is_running: Callable[[], bool], + transfer_queue: TransferQueue, ): self.log = logging.getLogger("ChunkUploader") self.metrics = metrics @@ -216,9 +224,15 @@ def handle_single_chunk( chunks, index: int, temp_dir: Path, + conn_polling: Callable[[], bool], delta_stats: Optional[DeltaStats] = None, - file_type: FileType = FileType.Basebackup_chunk + file_type: FileType = FileType.Basebackup_chunk, ) -> Dict[str, Any]: + # if the chunk is dependent on a PG connection and connection + # is not alive then abort the task + if not conn_polling(): + raise RuntimeError("ERROR: PostgreSQL connection was lost during backup process.") + one_chunk_files = chunks[index] chunk_name, input_size, result_size = self.tar_one_file( callback_queue=chunk_callback_queue, @@ -260,12 +274,14 @@ def wait_for_chunk_transfer_to_complete( def create_and_upload_chunks( self, + *, chunks, data_file_format: Callable[[int], str], + conn_polling: Callable[[], bool], temp_base_dir: Path, delta_stats: Optional[DeltaStats] = None, file_type: FileType = FileType.Basebackup_chunk, - chunks_max_progress: float = 100.0 + chunks_max_progress: float = 100.0, ) -> List[Dict[str, Any]]: start_time = time.monotonic() chunk_files = [] @@ -299,8 +315,9 @@ def create_and_upload_chunks( chunks=chunks, index=i, temp_dir=temp_base_dir, + conn_polling=conn_polling, delta_stats=delta_stats, - file_type=file_type + file_type=file_type, ) pending_compress_and_encrypt_tasks.append(task) self.chunks_on_disk += 1 diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index 58e2b274..283dd100 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -55,10 +55,21 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol): class DeltaBaseBackup: def __init__( - self, *, storage: BaseTransfer, site: str, site_config: Dict[str, Any], transfer_queue: TransferQueue, - metrics: Metrics, encryption_data: EncryptionData, compression_data: CompressionData, - get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]], parallel: int, temp_base_dir: Path, - compressed_base: Path, chunk_uploader: ChunkUploader, data_file_format: Callable[[int], str] + self, + *, + storage: BaseTransfer, + site: str, + site_config: Dict[str, Any], + transfer_queue: TransferQueue, + metrics: Metrics, + encryption_data: EncryptionData, + compression_data: CompressionData, + get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]], + parallel: int, + temp_base_dir: Path, + compressed_base: Path, + chunk_uploader: ChunkUploader, + data_file_format: Callable[[int], str], ): self.log = logging.getLogger("DeltaBaseBackup") self.storage = storage @@ -384,11 +395,14 @@ def _split_files_for_upload( return delta_chunks, todo_hexdigests - def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]: + def _upload_chunks( + self, delta_chunks, chunks_max_progress: float, conn_polling: Callable[[], bool], + ) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]: """Upload small files grouped into chunks to save on latency and requests costs""" chunk_files = self.chunk_uploader.create_and_upload_chunks( chunks=delta_chunks, data_file_format=self.data_file_format, + conn_polling=conn_polling, temp_base_dir=self.compressed_base, file_type=FileType.Basebackup_delta_chunk, chunks_max_progress=chunks_max_progress, @@ -426,7 +440,7 @@ def _read_delta_sizes(self, snapshot_result: SnapshotResult) -> Tuple[UploadedFi return digests_metric, embed_metric def run( - self, pgdata: str, src_iterate_func: Callable[[], Iterable[BackupPath]] + self, pgdata: str, src_iterate_func: Callable[[], Iterable[BackupPath]], conn_polling: Callable[[], bool], ) -> Tuple[int, int, BackupManifest, int, List[Dict[str, Any]]]: # NOTE: Hard links work only in the same FS, therefore using hopefully the same FS in PG home folder delta_dir = os.path.join(os.path.dirname(pgdata), "basebackup_delta") @@ -459,7 +473,9 @@ def run( sum(len(chunk) for chunk in delta_chunks) ) chunks_max_progress = delta_chunks_count * 100.0 / (delta_chunks_count + len(todo_hexdigests)) - chunks_metric, chunk_files = self._upload_chunks(delta_chunks, chunks_max_progress=chunks_max_progress) + chunks_metric, chunk_files = self._upload_chunks( + delta_chunks, chunks_max_progress=chunks_max_progress, conn_polling=conn_polling, + ) self.log.info( "Submitting hashes for upload: %r, total hashes in the snapshot: %r", len(todo_hexdigests), diff --git a/pghoard/pgutil.py b/pghoard/pgutil.py index f379f58b..46cff2ae 100644 --- a/pghoard/pgutil.py +++ b/pghoard/pgutil.py @@ -5,10 +5,9 @@ Copyright (c) 2015 Ohmu Ltd See LICENSE for details """ - +from psycopg2 import OperationalError from urllib.parse import parse_qs, urlparse - def create_connection_string(connection_info): return " ".join("{}='{}'".format(k, str(v).replace("'", "\\'")) for k, v in sorted(connection_info.items())) @@ -92,3 +91,15 @@ def parse_connection_string_libpq(connection_string): value, connection_string = rem, "" fields[key] = value return fields + + +def check_if_pg_connection_is_alive(db_conn) -> bool: + alive = False + try: + with db_conn.cursor() as cursor: + cursor.execute("SELECT 1;") + alive = True + except OperationalError: + alive = False + finally: + return alive diff --git a/test/basebackup/test_basebackup.py b/test/basebackup/test_basebackup.py index 961e524d..10a19063 100644 --- a/test/basebackup/test_basebackup.py +++ b/test/basebackup/test_basebackup.py @@ -946,3 +946,28 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d "7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e27": 8192, "7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e28": 800 } + + @pytest.mark.parametrize( + "backup_mode", [BaseBackupMode.local_tar, BaseBackupMode.delta, BaseBackupMode.local_tar_delta_stats], + ) + def test_create_basebackup_lost_pg_connection(self, capsys, db, pghoard, tmpdir, backup_mode: BaseBackupMode): + with patch("pghoard.basebackup.base.check_if_pg_connection_is_alive", return_value=False): + pghoard.create_backup_site_paths(pghoard.test_site) + basebackup_path = os.path.join(pghoard.config["backup_location"], pghoard.test_site, "basebackup") + q = Queue() + + pghoard.config["backup_sites"][pghoard.test_site]["basebackup_mode"] = backup_mode + pghoard.config["backup_sites"][pghoard.test_site]["active_backup_mode"] = "archive_command" + + now = datetime.datetime.now(datetime.timezone.utc) + metadata = { + "backup-reason": BackupReason.scheduled, + "backup-decision-time": now.isoformat(), + "normalized-backup-time": now.isoformat(), + } + pghoard.create_basebackup(pghoard.test_site, db.user, basebackup_path, q, metadata) + result = q.get(timeout=60) + + assert result.success is False + assert result.exception and isinstance(result.exception, RuntimeError) + assert result.exception.args[0] == 'ERROR: PostgreSQL connection was lost during backup process.'