From e5ba9348ef945a1e9cd754c62ed83982897d3305 Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Tue, 19 Nov 2024 13:30:24 +0100 Subject: [PATCH] monitor pg connection when uploading chunks --- pghoard/basebackup/base.py | 12 ++++-- pghoard/basebackup/chunks.py | 27 +++++++++++--- pghoard/basebackup/delta.py | 30 +++++++++++---- pghoard/pgutil.py | 15 +++++++- test/basebackup/test_basebackup.py | 59 ++++++++++++++++++++++-------- 5 files changed, 110 insertions(+), 33 deletions(-) diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 260bd850..20089618 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -35,6 +35,7 @@ set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) from pghoard.compressor import CompressionEvent +from pghoard.pgutil import check_if_pg_connection_is_alive from pghoard.transfer import UploadEvent BASEBACKUP_NAME = "pghoard_base_backup" @@ -543,6 +544,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = self.log.debug("Connecting to database to start backup process") connection_string = connection_string_using_pgpass(self.connection_info) with psycopg2.connect(connection_string) as db_conn: + conn_polling = lambda: check_if_pg_connection_is_alive(db_conn) cursor = db_conn.cursor() if self.pg_version_server < 90600: @@ -589,6 +591,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = for item in self.find_files_to_backup(pgdata=pgdata, tablespaces=tablespaces) if not item[1].endswith(".pem") # Exclude such files like "dh1024.pem" ), + conn_polling=conn_polling, ) chunks_count = len(chunk_files) control_files_metadata_extra["chunks"] = chunk_files @@ -607,11 +610,12 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 # is reserved for special files and metadata and will be generated last. chunk_files = self.chunk_uploader.create_and_upload_chunks( - chunks, - data_file_format, - compressed_base, + chunks=chunks, + data_file_format=data_file_format, + temp_base_dir=compressed_base, delta_stats=delta_stats, - file_type=FileType.Basebackup_chunk + file_type=FileType.Basebackup_chunk, + conn_polling=conn_polling, ) total_size_plain = sum(item["input_size"] for item in chunk_files) diff --git a/pghoard/basebackup/chunks.py b/pghoard/basebackup/chunks.py index caa10d64..68213bfd 100644 --- a/pghoard/basebackup/chunks.py +++ b/pghoard/basebackup/chunks.py @@ -62,8 +62,16 @@ class DeltaStats: class ChunkUploader: def __init__( - self, metrics: Metrics, chunks_on_disk: int, encryption_data: EncryptionData, compression_data: CompressionData, - site_config: Dict[str, Any], site: str, is_running: Callable[[], bool], transfer_queue: TransferQueue + self, + *, + metrics: Metrics, + chunks_on_disk: int, + encryption_data: EncryptionData, + compression_data: CompressionData, + site_config: Dict[str, Any], + site: str, + is_running: Callable[[], bool], + transfer_queue: TransferQueue, ): self.log = logging.getLogger("ChunkUploader") self.metrics = metrics @@ -216,9 +224,15 @@ def handle_single_chunk( chunks, index: int, temp_dir: Path, + conn_polling: Callable[[], bool], delta_stats: Optional[DeltaStats] = None, - file_type: FileType = FileType.Basebackup_chunk + file_type: FileType = FileType.Basebackup_chunk, ) -> Dict[str, Any]: + # if the chunk is dependent on a PG connection and connection + # is not alive then abort the task + if not conn_polling(): + raise RuntimeError("ERROR: PostgreSQL connection was lost during backup process.") + one_chunk_files = chunks[index] chunk_name, input_size, result_size = self.tar_one_file( callback_queue=chunk_callback_queue, @@ -260,12 +274,14 @@ def wait_for_chunk_transfer_to_complete( def create_and_upload_chunks( self, + *, chunks, data_file_format: Callable[[int], str], + conn_polling: Callable[[], bool], temp_base_dir: Path, delta_stats: Optional[DeltaStats] = None, file_type: FileType = FileType.Basebackup_chunk, - chunks_max_progress: float = 100.0 + chunks_max_progress: float = 100.0, ) -> List[Dict[str, Any]]: start_time = time.monotonic() chunk_files = [] @@ -299,8 +315,9 @@ def create_and_upload_chunks( chunks=chunks, index=i, temp_dir=temp_base_dir, + conn_polling=conn_polling, delta_stats=delta_stats, - file_type=file_type + file_type=file_type, ) pending_compress_and_encrypt_tasks.append(task) self.chunks_on_disk += 1 diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index 58e2b274..283dd100 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -55,10 +55,21 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol): class DeltaBaseBackup: def __init__( - self, *, storage: BaseTransfer, site: str, site_config: Dict[str, Any], transfer_queue: TransferQueue, - metrics: Metrics, encryption_data: EncryptionData, compression_data: CompressionData, - get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]], parallel: int, temp_base_dir: Path, - compressed_base: Path, chunk_uploader: ChunkUploader, data_file_format: Callable[[int], str] + self, + *, + storage: BaseTransfer, + site: str, + site_config: Dict[str, Any], + transfer_queue: TransferQueue, + metrics: Metrics, + encryption_data: EncryptionData, + compression_data: CompressionData, + get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]], + parallel: int, + temp_base_dir: Path, + compressed_base: Path, + chunk_uploader: ChunkUploader, + data_file_format: Callable[[int], str], ): self.log = logging.getLogger("DeltaBaseBackup") self.storage = storage @@ -384,11 +395,14 @@ def _split_files_for_upload( return delta_chunks, todo_hexdigests - def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]: + def _upload_chunks( + self, delta_chunks, chunks_max_progress: float, conn_polling: Callable[[], bool], + ) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]: """Upload small files grouped into chunks to save on latency and requests costs""" chunk_files = self.chunk_uploader.create_and_upload_chunks( chunks=delta_chunks, data_file_format=self.data_file_format, + conn_polling=conn_polling, temp_base_dir=self.compressed_base, file_type=FileType.Basebackup_delta_chunk, chunks_max_progress=chunks_max_progress, @@ -426,7 +440,7 @@ def _read_delta_sizes(self, snapshot_result: SnapshotResult) -> Tuple[UploadedFi return digests_metric, embed_metric def run( - self, pgdata: str, src_iterate_func: Callable[[], Iterable[BackupPath]] + self, pgdata: str, src_iterate_func: Callable[[], Iterable[BackupPath]], conn_polling: Callable[[], bool], ) -> Tuple[int, int, BackupManifest, int, List[Dict[str, Any]]]: # NOTE: Hard links work only in the same FS, therefore using hopefully the same FS in PG home folder delta_dir = os.path.join(os.path.dirname(pgdata), "basebackup_delta") @@ -459,7 +473,9 @@ def run( sum(len(chunk) for chunk in delta_chunks) ) chunks_max_progress = delta_chunks_count * 100.0 / (delta_chunks_count + len(todo_hexdigests)) - chunks_metric, chunk_files = self._upload_chunks(delta_chunks, chunks_max_progress=chunks_max_progress) + chunks_metric, chunk_files = self._upload_chunks( + delta_chunks, chunks_max_progress=chunks_max_progress, conn_polling=conn_polling, + ) self.log.info( "Submitting hashes for upload: %r, total hashes in the snapshot: %r", len(todo_hexdigests), diff --git a/pghoard/pgutil.py b/pghoard/pgutil.py index f379f58b..841d3e09 100644 --- a/pghoard/pgutil.py +++ b/pghoard/pgutil.py @@ -5,10 +5,9 @@ Copyright (c) 2015 Ohmu Ltd See LICENSE for details """ - +from psycopg2 import OperationalError from urllib.parse import parse_qs, urlparse - def create_connection_string(connection_info): return " ".join("{}='{}'".format(k, str(v).replace("'", "\\'")) for k, v in sorted(connection_info.items())) @@ -92,3 +91,15 @@ def parse_connection_string_libpq(connection_string): value, connection_string = rem, "" fields[key] = value return fields + + +def check_if_pg_connection_is_alive(db_conn) -> bool: + alive = False + try: + with db_conn.cursor() as cursor: + cursor.execute("SELECT 1;") + alive = True + except OperationalError: + alive = False + + return alive diff --git a/test/basebackup/test_basebackup.py b/test/basebackup/test_basebackup.py index 961e524d..6f5ce050 100644 --- a/test/basebackup/test_basebackup.py +++ b/test/basebackup/test_basebackup.py @@ -88,7 +88,7 @@ def test_find_files(self, db): def create_test_files(): # Create two temporary files on top level and one in global/ that we'll unlink while iterating with open(top1, "w") as t1, open(top2, "w") as t2, \ - open(sub1, "w") as s1, open(sub2, "w") as s2, open(sub3, "w") as s3: + open(sub1, "w") as s1, open(sub2, "w") as s2, open(sub3, "w") as s3: t1.write("t1\n") t2.write("t2\n") s1.write("s1\n") @@ -270,7 +270,8 @@ def _test_create_basebackup(self, capsys, db, pghoard, mode, replica=False, acti storage_config = common.get_object_storage_config(pghoard.config, pghoard.test_site) storage = get_transfer(storage_config) - backups = storage.list_path(os.path.join(pghoard.config["backup_sites"][pghoard.test_site]["prefix"], "basebackup")) + backups = storage.list_path( + os.path.join(pghoard.config["backup_sites"][pghoard.test_site]["prefix"], "basebackup")) assert len(backups) > 0 backups = sorted(backups, key=lambda backup: backup["metadata"]["backup-decision-time"]) last_backup = backups[-1] @@ -289,12 +290,12 @@ def _test_create_basebackup(self, capsys, db, pghoard, mode, replica=False, acti assert last_backup["metadata"]["basebackup-mode"] == mode def _restore_basebackup( - self, - pghoard: PGHoardForTest, - backup_out: str, - preserve_until: Optional[str] = None, - cancel_preserve_on_success: bool = True, - overwrite: bool = False + self, + pghoard: PGHoardForTest, + backup_out: str, + preserve_until: Optional[str] = None, + cancel_preserve_on_success: bool = True, + overwrite: bool = False ) -> None: # Restoring to empty directory works os.makedirs(backup_out, exist_ok=True) @@ -312,7 +313,7 @@ def _restore_basebackup( Restore().run(arguments) def _test_restore_basebackup( - self, db, pghoard, tmpdir, active_backup_mode="archive_command", preserve_until: Optional[str] = None + self, db, pghoard, tmpdir, active_backup_mode="archive_command", preserve_until: Optional[str] = None ): backup_out = tmpdir.join("test-restore").strpath self._restore_basebackup(pghoard, backup_out, preserve_until=preserve_until) @@ -329,7 +330,8 @@ def _test_restore_basebackup( # was in the backup label storage_config = common.get_object_storage_config(pghoard.config, pghoard.test_site) storage = get_transfer(storage_config) - backups = storage.list_path(os.path.join(pghoard.config["backup_sites"][pghoard.test_site]["prefix"], "basebackup")) + backups = storage.list_path( + os.path.join(pghoard.config["backup_sites"][pghoard.test_site]["prefix"], "basebackup")) # lets grab the backup label details for what we restored pgb = PGBaseBackup( @@ -362,7 +364,8 @@ def _test_restore_basebackup( else: assert os.path.isfile(path) is False - def _test_basebackups(self, capsys, db, pghoard, tmpdir, mode, *, replica=False, preserve_until: Optional[None] = None): + def _test_basebackups(self, capsys, db, pghoard, tmpdir, mode, *, replica=False, + preserve_until: Optional[None] = None): self._test_create_basebackup(capsys, db, pghoard, mode, replica=replica) self._test_restore_basebackup(db, pghoard, tmpdir, preserve_until=preserve_until) @@ -421,7 +424,8 @@ def _run_backup_deletion(self, pghoard: PGHoardForTest) -> None: def _check_backups_count(self, pghoard: PGHoardForTest, expected_count: int) -> None: storage_config = common.get_object_storage_config(pghoard.config, pghoard.test_site) storage = get_transfer(storage_config) - backups = storage.list_path(os.path.join(pghoard.config["backup_sites"][pghoard.test_site]["prefix"], "basebackup")) + backups = storage.list_path( + os.path.join(pghoard.config["backup_sites"][pghoard.test_site]["prefix"], "basebackup")) assert len(backups) == expected_count @pytest.mark.parametrize( @@ -430,8 +434,8 @@ def _check_backups_count(self, pghoard: PGHoardForTest, expected_count: int) -> (None, 1024 * 1024 * 10, 3, 0)] ) def test_basebackups_delta_config_params( - self, capsys, db, pghoard, tmpdir, delta_file_size: int, delta_chunk_size: int, expected_chunks_count: int, - expected_delta_files_count: int + self, capsys, db, pghoard, tmpdir, delta_file_size: int, delta_chunk_size: int, expected_chunks_count: int, + expected_delta_files_count: int ) -> None: site = pghoard.test_site site_config = pghoard.config["backup_sites"][site] @@ -932,7 +936,7 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d return meta, b"some content" with patch.object(pgb, "get_remote_basebackups_info") as mock_get_remote_basebackups_info, \ - patch("pghoard.basebackup.base.download_backup_meta_file", new=fake_download_backup_meta_file): + patch("pghoard.basebackup.base.download_backup_meta_file", new=fake_download_backup_meta_file): mock_get_remote_basebackups_info.return_value = [{ "name": f"backup{idx}", "metadata": { @@ -946,3 +950,28 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d "7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e27": 8192, "7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e28": 800 } + + @pytest.mark.parametrize( + "backup_mode", [BaseBackupMode.local_tar, BaseBackupMode.delta, BaseBackupMode.local_tar_delta_stats], + ) + def test_create_basebackup_lost_pg_connection(self, db, pghoard, backup_mode: BaseBackupMode): + with patch("pghoard.basebackup.base.check_if_pg_connection_is_alive", return_value=False): + pghoard.create_backup_site_paths(pghoard.test_site) + basebackup_path = os.path.join(pghoard.config["backup_location"], pghoard.test_site, "basebackup") + q: Queue[CallbackEvent] = Queue() + + pghoard.config["backup_sites"][pghoard.test_site]["basebackup_mode"] = backup_mode + pghoard.config["backup_sites"][pghoard.test_site]["active_backup_mode"] = "archive_command" + + now = datetime.datetime.now(datetime.timezone.utc) + metadata = { + "backup-reason": BackupReason.scheduled, + "backup-decision-time": now.isoformat(), + "normalized-backup-time": now.isoformat(), + } + pghoard.create_basebackup(pghoard.test_site, db.user, basebackup_path, q, metadata) + result = q.get(timeout=60) + + assert result.success is False + assert result.exception and isinstance(result.exception, RuntimeError) + assert result.exception.args[0] == 'ERROR: PostgreSQL connection was lost during backup process.'