From b6eb90d8b01e21572144a48a3c8ab6da5523b8c4 Mon Sep 17 00:00:00 2001 From: Ahmed Et-tanany Date: Mon, 10 Jun 2024 14:58:00 +0200 Subject: [PATCH] Support cleaning up extra backup sites This adds a new setting called `extra_backup_sites_prefixes`, which when set, makes cleaning process look into those extra sites as well, and this is particularly useful when we have old backups from a previous backup site after a PG major version upgrade. --- .pylintrc | 2 +- README.rst | 5 ++ pghoard/pghoard.py | 87 +++++++++++++++++++--------- test/test_pghoard.py | 131 ++++++++++++++++++++++++++++++++++++------- 4 files changed, 177 insertions(+), 48 deletions(-) diff --git a/.pylintrc b/.pylintrc index f90a82a0..547aef86 100644 --- a/.pylintrc +++ b/.pylintrc @@ -28,7 +28,7 @@ disable= [FORMAT] max-line-length=125 -max-module-lines=1100 +max-module-lines=1200 [REPORTS] output-format=text diff --git a/README.rst b/README.rst index 4f7fa941..2eeddcff 100644 --- a/README.rst +++ b/README.rst @@ -723,6 +723,11 @@ How many threads to use for tar, compress and encrypt tasks. Only applies for this, with higher thread count speed improvement is negligible and CPU time is lost switching between threads. +``extra_backup_sites_prefixes`` (default undefined) + +Prefixes for extra backup sites to look into during cleanup. This could be useful +if you want to removed old backups from previous backup sites after a PG upgrade. + ``encryption_key_id`` (no default) Specifies the encryption key used when storing encrypted backups. If this diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index ce124a09..96e845e2 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -323,7 +323,7 @@ def create_backup_site_paths(self, site: str) -> BackupSitePaths: return backup_site_paths - def delete_remote_wal_before(self, wal_segment, site, pg_version): + def delete_remote_wal_before(self, wal_segment, site, site_prefix, pg_version): self.log.info("Starting WAL deletion from: %r before: %r, pg_version: %r", site, wal_segment, pg_version) storage = self.site_transfers.get(site) valid_timeline = True @@ -334,7 +334,7 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): lsn = lsn.previous_walfile_start_lsn if lsn is None: break - wal_path = os.path.join(self._get_site_prefix(site), "xlog", lsn.walfile_name) + wal_path = os.path.join(site_prefix, "xlog", lsn.walfile_name) self.log.debug("Deleting wal_file: %r", wal_path) try: storage.delete_key(wal_path) @@ -360,8 +360,8 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_to_delete, backups_to_keep) -> List: delta_formats = (BaseBackupFormat.delta_v1, BaseBackupFormat.delta_v2) assert metadata["format"] in delta_formats - all_hexdigests = set() - keep_hexdigests = set() + all_hexdigests: Dict[str, str] = {} + keep_hexdigests: Dict[str, str] = {} basebackup_data_files = list() delta_backup_names = { @@ -372,7 +372,7 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t delta_backup_names[basebackup_name_to_delete] = metadata for backup_name, backup_metadata in delta_backup_names.items(): - delta_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", backup_name) + delta_backup_key = os.path.join(backup_metadata["site-prefix"], "basebackup", backup_name) meta, _ = download_backup_meta_file( storage=storage, basebackup_path=delta_backup_key, @@ -385,18 +385,22 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t backup_state = snapshot_result["state"] files = backup_state["files"] - backup_hexdigests = set(delta_file["hexdigest"] for delta_file in files if delta_file["hexdigest"]) - all_hexdigests |= backup_hexdigests + backup_hexdigests = { + delta_file["hexdigest"]: backup_metadata["site-prefix"] + for delta_file in files + if delta_file["hexdigest"] + } + all_hexdigests.update(backup_hexdigests) if backup_name != basebackup_name_to_delete: # Keep data file in case if there is still a reference from other backups - keep_hexdigests |= backup_hexdigests + keep_hexdigests.update(backup_hexdigests) else: # Add bundles to remove for chunk in meta.get("chunks", []): basebackup_data_files.append( os.path.join( - self._get_site_prefix(site), + backup_metadata["site-prefix"], FileTypePrefixes[FileType.Basebackup_delta_chunk], chunk["chunk_filename"], ) @@ -405,16 +409,21 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t # Remove unreferenced files extra_hexdigests = set(all_hexdigests).difference(keep_hexdigests) for hexdigest in extra_hexdigests: - basebackup_data_files.append(os.path.join(self._get_site_prefix(site), "basebackup_delta", hexdigest)) + basebackup_data_files.append(os.path.join(all_hexdigests[hexdigest], "basebackup_delta", hexdigest)) return basebackup_data_files def delete_remote_basebackup(self, site, basebackup, metadata, basebackups): start_time = time.monotonic() storage = self.site_transfers.get(site) - main_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", basebackup) + main_backup_key = os.path.join(metadata["site-prefix"], "basebackup", basebackup) basebackup_data_files = [main_backup_key] + # When we delete a basebackup, let's also delete any WAL segments before its `start-wal-segment`. + pg_version_str = metadata.get("pg-version") + pg_version = None if pg_version_str is None else int(pg_version_str) + self.delete_remote_wal_before(metadata["start-wal-segment"], site, metadata["site-prefix"], pg_version) + if metadata.get("format") == BaseBackupFormat.v2: bmeta_compressed = storage.get_contents_to_string(main_backup_key)[0] with rohmufile.file_reader( @@ -427,7 +436,7 @@ def delete_remote_basebackup(self, site, basebackup, metadata, basebackups): for chunk in bmeta["chunks"]: basebackup_data_files.append( os.path.join( - self._get_site_prefix(site), + metadata["site-prefix"], "basebackup_chunk", chunk["chunk_filename"], ) @@ -457,14 +466,15 @@ def get_or_create_site_storage(self, site): self.site_transfers[site] = storage return storage - def get_remote_basebackups_info(self, site): + def get_remote_basebackups_info(self, site, site_prefix=None): storage = self.get_or_create_site_storage(site=site) site_config = self.config["backup_sites"][site] - results = storage.list_path(os.path.join(site_config["prefix"], "basebackup")) + site_prefix = site_prefix or site_config["prefix"] + results = storage.list_path(os.path.join(site_prefix, "basebackup")) for entry in results: self.patch_basebackup_info(entry=entry, site_config=site_config) - preservation_requests = storage.list_path(os.path.join(site_config["prefix"], "preservation_request")) + preservation_requests = storage.list_path(os.path.join(site_prefix, "preservation_request")) backups_to_preserve = parse_preservation_requests(preservation_requests) for entry in results: patch_basebackup_metadata_with_preservation(entry, backups_to_preserve) @@ -517,7 +527,7 @@ def determine_backups_to_delete(self, *, basebackups, site_config): if max_age_days and min_backups > 0: while basebackups and len(basebackups) > min_backups: if is_basebackup_preserved(basebackups[0], now): - self.log.info("Not deleting more backups because %r still needs to preserved", basebackups[0]["name"]) + self.log.info("Not deleting more backups because %r still needs to be preserved", basebackups[0]["name"]) break # For age checks we treat the age as current_time - (backup_start_time + backup_interval). So when # backup interval is set to 24 hours a backup started 2.5 days ago would be considered to be 1.5 days old. @@ -539,31 +549,54 @@ def determine_backups_to_delete(self, *, basebackups, site_config): def refresh_backup_list_and_delete_old(self, site): """Look up basebackups from the object store, prune any extra - backups and return the datetime of the latest backup.""" - basebackups = self.get_remote_basebackups_info(site) - self.log.debug("Found %r basebackups", basebackups) + backups from the current and the extra backup sites and update + the state with the up-to-date backups list.""" + current_basebackups = self.get_remote_basebackups_info(site) + current_site_prefix = self._get_site_prefix(site) + for basebackup in current_basebackups: + basebackup["metadata"]["site-prefix"] = current_site_prefix + + # If `extra_backup_sites_prefixes` is set, let's also check those sites for backups that are due for cleanup. + extra_basebackups = [] + if self.config.get("extra_backup_sites_prefixes"): + extra_backup_sites_prefixes = self.config["extra_backup_sites_prefixes"] + for site_prefix in extra_backup_sites_prefixes: + extra_site_basebackups = self.get_remote_basebackups_info(site, site_prefix=site_prefix) + for basebackup in extra_site_basebackups: + basebackup["metadata"]["site-prefix"] = site_prefix + extra_basebackups.extend(extra_site_basebackups) + + extra_basebackups.sort(key=lambda entry: entry["metadata"]["start-time"]) + all_basebackups = extra_basebackups + current_basebackups + + self.log.debug("Found %r basebackups", all_basebackups) site_config = self.config["backup_sites"][site] # Never delete backups from a recovery site. This check is already elsewhere as well # but still check explicitly here to ensure we certainly won't delete anything unexpectedly if site_config["active"]: - basebackups_to_delete = self.determine_backups_to_delete(basebackups=basebackups, site_config=site_config) + basebackups_to_delete = self.determine_backups_to_delete(basebackups=all_basebackups, site_config=site_config) for basebackup_to_be_deleted in basebackups_to_delete: pg_version_str = basebackup_to_be_deleted["metadata"].get("pg-version") pg_version = None if pg_version_str is None else int(pg_version_str) - last_wal_segment_still_needed = 0 - if basebackups: - last_wal_segment_still_needed = basebackups[0]["metadata"]["start-wal-segment"] + oldest_wal_segment_to_keep = "" + if all_basebackups: + oldest_wal_segment_to_keep = all_basebackups[0]["metadata"]["start-wal-segment"] - if last_wal_segment_still_needed: + if oldest_wal_segment_to_keep: # This is breaking concurrent PITR starting from the *previous* backup. # That's why once a backup is preserved, we keep that backup and all the next ones. - self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version) + self.delete_remote_wal_before( + oldest_wal_segment_to_keep, site, basebackup_to_be_deleted["metadata"]["site-prefix"], pg_version + ) self.delete_remote_basebackup( - site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"], basebackups=basebackups + site, + basebackup_to_be_deleted["name"], + basebackup_to_be_deleted["metadata"], + basebackups=all_basebackups ) - self.state["backup_sites"][site]["basebackups"] = basebackups + self.state["backup_sites"][site]["basebackups"] = current_basebackups def get_normalized_backup_time(self, site_config, *, now=None): """Returns the closest historical backup time that current time matches to (or current time if it matches). diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 37ad3b46..0cbdd97b 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -43,6 +43,10 @@ def setup_method(self, method): ], }, }, + "extra_backup_sites_prefixes": [ + f"extra_site_1_{self.test_site}", + f"extra_site_2_{self.test_site}", + ], }) config_path = os.path.join(self.temp_dir, "pghoard.json") write_json_file(config_path, self.config) @@ -341,15 +345,18 @@ def test_determine_backups_to_delete_with_preserve_when_older_than_max_age(self) assert to_delete == [bbs[0]] def test_local_refresh_backup_list_and_delete_old(self): - basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") - wal_storage_path = os.path.join(self.local_storage_dir, "xlog") - os.makedirs(basebackup_storage_path) - os.makedirs(wal_storage_path) - self.pghoard.set_state_defaults(self.test_site) assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] - def write_backup_and_wal_files(what): + base_local_storage_dir = self.config["backup_sites"][self.test_site]["object_storage"]["directory"] + os.makedirs(base_local_storage_dir) + + def write_backup_and_wal_files(what, site_dir): + basebackup_storage_path = os.path.join(base_local_storage_dir, site_dir, "basebackup") + wal_storage_path = os.path.join(base_local_storage_dir, site_dir, "xlog") + os.makedirs(basebackup_storage_path, exist_ok=True) + os.makedirs(wal_storage_path, exist_ok=True) + for bb, wals in what.items(): if bb: bb_path = os.path.join(basebackup_storage_path, bb) @@ -388,13 +395,84 @@ def write_backup_and_wal_files(what): "000000040000000B00000004", ], } - write_backup_and_wal_files(backups_and_wals) + + extra_site_1_backups_and_wals = { + "2015-08-23_0": [ + # NOTE: gap between this and next segment means that cleanup shouldn't find this + "000000010000000A000000DE", + ], + "2015-08-23_1": [ + "000000010000000A000000E0", + "000000010000000A000000E1", + ], + "2015-08-23_2": [ + "000000010000000A000000E2", + "000000010000000A000000E3", + ], + } + + extra_site_2_backups_and_wals = { + "2015-08-24_0": [ + "000000010000000A000000E7", + ], + "2015-08-24_1": [ + "000000010000000A000000E8", + "000000010000000A000000E9", + ], + "2015-08-24_2": [ + "000000010000000A000000EA", + "000000010000000A000000EB", + "000000010000000A000000EC", + "000000010000000A000000ED", + ], + } + + extra_site_1 = f"extra_site_1_{self.test_site}" + extra_site_2 = f"extra_site_2_{self.test_site}" + write_backup_and_wal_files(backups_and_wals, self.test_site) + write_backup_and_wal_files(extra_site_1_backups_and_wals, extra_site_1) + write_backup_and_wal_files(extra_site_2_backups_and_wals, extra_site_2) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) assert len(basebackups) == 4 + assert {backup["name"] for backup in basebackups} == backups_and_wals.keys() + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) assert len(basebackups) == 1 + assert basebackups[0]["name"] == "2015-08-25_3" + basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") + assert set(os.listdir(basebackup_storage_path)) == {"2015-08-25_3", "2015-08-25_3.metadata"} + wal_storage_path = os.path.join(self.local_storage_dir, "xlog") assert len(os.listdir(wal_storage_path)) == 3 + assert set(os.listdir(wal_storage_path)) == { + "000000010000000A000000FB", + "000000040000000B00000003", + "000000040000000B00000004", + } + + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=f"extra_site_1_{self.test_site}") + assert len(basebackups) == 0 + extra_site_1_wal_storage_path = os.path.join(base_local_storage_dir, extra_site_1, "xlog") + assert len(os.listdir(extra_site_1_wal_storage_path)) == 3 + assert set(os.listdir(extra_site_1_wal_storage_path)) == { + "000000010000000A000000DE", + "000000010000000A000000E2", + "000000010000000A000000E3", + } + + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=f"extra_site_2_{self.test_site}") + assert len(basebackups) == 0 + extra_site_2_wal_storage_path = os.path.join(base_local_storage_dir, extra_site_2, "xlog") + assert len(os.listdir(extra_site_2_wal_storage_path)) == 4 + assert set(os.listdir(extra_site_2_wal_storage_path)) == { + "000000010000000A000000EA", + "000000010000000A000000EB", + "000000010000000A000000EC", + "000000010000000A000000ED", + } + # Put all WAL segments between 1 and 9 in place to see that they're deleted and we don't try to go back # any further from TLI 1. Note that timeline 3 is now "empty" so deletion shouldn't touch timelines 2 # or 1. @@ -412,15 +490,20 @@ def write_backup_and_wal_files(what): "000000040000000B00000005", ], } - write_backup_and_wal_files(new_backups_and_wals) + write_backup_and_wal_files(new_backups_and_wals, self.local_storage_dir) assert len(os.listdir(wal_storage_path)) == 11 + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) assert len(basebackups) == 1 - expected_wal_count = len(backups_and_wals["2015-08-25_0"]) - expected_wal_count += len(new_backups_and_wals[""]) - expected_wal_count += len(new_backups_and_wals["2015-08-25_4"]) - assert len(os.listdir(wal_storage_path)) == expected_wal_count + assert set(os.listdir(basebackup_storage_path)) == {"2015-08-25_4", "2015-08-25_4.metadata"} + expected_wal_segments = ( + backups_and_wals["2015-08-25_0"] + new_backups_and_wals[""] + new_backups_and_wals["2015-08-25_4"] + ) + assert len(expected_wal_segments) == 9 + assert set(os.listdir(wal_storage_path)) == set(expected_wal_segments) + # Now put WAL files in place with no gaps anywhere gapless_backups_and_wals = { "2015-08-25_3": [ @@ -431,12 +514,16 @@ def write_backup_and_wal_files(what): "000000040000000B00000005", ], } - write_backup_and_wal_files(gapless_backups_and_wals) - assert len(os.listdir(wal_storage_path)) >= 10 + write_backup_and_wal_files(gapless_backups_and_wals, self.local_storage_dir) + assert len(os.listdir(wal_storage_path)) == 11 + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) assert len(basebackups) == 1 + assert set(os.listdir(basebackup_storage_path)) == {"2015-08-25_4", "2015-08-25_4.metadata"} assert len(os.listdir(wal_storage_path)) == 1 + assert set(os.listdir(wal_storage_path)) == {"000000040000000B00000005"} def test_local_refresh_backup_list_and_delete_old_delta_format(self): basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") @@ -555,25 +642,29 @@ def test_get_delta_basebackup_files(self, backup_to_delete: str, backup_to_delet { "name": "bb_v1", "metadata": { - "format": BaseBackupFormat.v1 + "format": BaseBackupFormat.v1, + "site-prefix": self.test_site, }, }, { "name": "bb_v2", "metadata": { - "format": BaseBackupFormat.v2 + "format": BaseBackupFormat.v2, + "site-prefix": self.test_site, }, }, { "name": "delta_v1", "metadata": { - "format": BaseBackupFormat.delta_v1 + "format": BaseBackupFormat.delta_v1, + "site-prefix": self.test_site, }, }, { "name": "delta_v2", "metadata": { - "format": BaseBackupFormat.delta_v2 + "format": BaseBackupFormat.delta_v2, + "site-prefix": self.test_site, }, }, ] @@ -645,7 +736,7 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d self.pghoard._get_delta_basebackup_files( # pylint: disable=protected-access site=self.test_site, storage=Mock(), - metadata=backup_to_delete_meta, + metadata={**backup_to_delete_meta, "site-prefix": self.test_site}, basebackup_name_to_delete=backup_to_delete, backups_to_keep=backups_to_keep ) @@ -654,7 +745,7 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d res = self.pghoard._get_delta_basebackup_files( # pylint: disable=protected-access site=self.test_site, storage=Mock(), - metadata=backup_to_delete_meta, + metadata={**backup_to_delete_meta, "site-prefix": self.test_site}, basebackup_name_to_delete=backup_to_delete, backups_to_keep=backups_to_keep )