diff --git a/.pylintrc b/.pylintrc index f90a82a0..547aef86 100644 --- a/.pylintrc +++ b/.pylintrc @@ -28,7 +28,7 @@ disable= [FORMAT] max-line-length=125 -max-module-lines=1100 +max-module-lines=1200 [REPORTS] output-format=text diff --git a/README.rst b/README.rst index 242244b5..a70b93c7 100644 --- a/README.rst +++ b/README.rst @@ -717,6 +717,11 @@ How many threads to use for tar, compress and encrypt tasks. Only applies for this, with higher thread count speed improvement is negligible and CPU time is lost switching between threads. +``extra_backup_sites_prefixes`` (default undefined) + +Prefixes for extra backup sites to look into during cleanup. This could be useful +if you want to removed old backups from previous backup sites after a PG upgrade. + ``encryption_key_id`` (no default) Specifies the encryption key used when storing encrypted backups. If this diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index ce124a09..8cbcf597 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -323,7 +323,7 @@ def create_backup_site_paths(self, site: str) -> BackupSitePaths: return backup_site_paths - def delete_remote_wal_before(self, wal_segment, site, pg_version): + def delete_remote_wal_before(self, wal_segment, site, site_prefix, pg_version): self.log.info("Starting WAL deletion from: %r before: %r, pg_version: %r", site, wal_segment, pg_version) storage = self.site_transfers.get(site) valid_timeline = True @@ -334,7 +334,7 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): lsn = lsn.previous_walfile_start_lsn if lsn is None: break - wal_path = os.path.join(self._get_site_prefix(site), "xlog", lsn.walfile_name) + wal_path = os.path.join(site_prefix, "xlog", lsn.walfile_name) self.log.debug("Deleting wal_file: %r", wal_path) try: storage.delete_key(wal_path) @@ -360,8 +360,8 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_to_delete, backups_to_keep) -> List: delta_formats = (BaseBackupFormat.delta_v1, BaseBackupFormat.delta_v2) assert metadata["format"] in delta_formats - all_hexdigests = set() - keep_hexdigests = set() + all_hexdigests: Dict[str, str] = {} + keep_hexdigests: Dict[str, str] = {} basebackup_data_files = list() delta_backup_names = { @@ -372,7 +372,7 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t delta_backup_names[basebackup_name_to_delete] = metadata for backup_name, backup_metadata in delta_backup_names.items(): - delta_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", backup_name) + delta_backup_key = os.path.join(backup_metadata["site-prefix"], "basebackup", backup_name) meta, _ = download_backup_meta_file( storage=storage, basebackup_path=delta_backup_key, @@ -385,18 +385,22 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t backup_state = snapshot_result["state"] files = backup_state["files"] - backup_hexdigests = set(delta_file["hexdigest"] for delta_file in files if delta_file["hexdigest"]) - all_hexdigests |= backup_hexdigests + backup_hexdigests = { + delta_file["hexdigest"]: backup_metadata["site-prefix"] + for delta_file in files + if delta_file["hexdigest"] + } + all_hexdigests.update(backup_hexdigests) if backup_name != basebackup_name_to_delete: # Keep data file in case if there is still a reference from other backups - keep_hexdigests |= backup_hexdigests + keep_hexdigests.update(backup_hexdigests) else: # Add bundles to remove for chunk in meta.get("chunks", []): basebackup_data_files.append( os.path.join( - self._get_site_prefix(site), + backup_metadata["site-prefix"], FileTypePrefixes[FileType.Basebackup_delta_chunk], chunk["chunk_filename"], ) @@ -405,14 +409,14 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t # Remove unreferenced files extra_hexdigests = set(all_hexdigests).difference(keep_hexdigests) for hexdigest in extra_hexdigests: - basebackup_data_files.append(os.path.join(self._get_site_prefix(site), "basebackup_delta", hexdigest)) + basebackup_data_files.append(os.path.join(all_hexdigests[hexdigest], "basebackup_delta", hexdigest)) return basebackup_data_files def delete_remote_basebackup(self, site, basebackup, metadata, basebackups): start_time = time.monotonic() storage = self.site_transfers.get(site) - main_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", basebackup) + main_backup_key = os.path.join(metadata["site-prefix"], "basebackup", basebackup) basebackup_data_files = [main_backup_key] if metadata.get("format") == BaseBackupFormat.v2: @@ -427,7 +431,7 @@ def delete_remote_basebackup(self, site, basebackup, metadata, basebackups): for chunk in bmeta["chunks"]: basebackup_data_files.append( os.path.join( - self._get_site_prefix(site), + metadata["site-prefix"], "basebackup_chunk", chunk["chunk_filename"], ) @@ -457,14 +461,15 @@ def get_or_create_site_storage(self, site): self.site_transfers[site] = storage return storage - def get_remote_basebackups_info(self, site): + def get_remote_basebackups_info(self, site, site_prefix=None): storage = self.get_or_create_site_storage(site=site) site_config = self.config["backup_sites"][site] - results = storage.list_path(os.path.join(site_config["prefix"], "basebackup")) + site_prefix = site_prefix or site_config["prefix"] + results = storage.list_path(os.path.join(site_prefix, "basebackup")) for entry in results: self.patch_basebackup_info(entry=entry, site_config=site_config) - preservation_requests = storage.list_path(os.path.join(site_config["prefix"], "preservation_request")) + preservation_requests = storage.list_path(os.path.join(site_prefix, "preservation_request")) backups_to_preserve = parse_preservation_requests(preservation_requests) for entry in results: patch_basebackup_metadata_with_preservation(entry, backups_to_preserve) @@ -517,7 +522,7 @@ def determine_backups_to_delete(self, *, basebackups, site_config): if max_age_days and min_backups > 0: while basebackups and len(basebackups) > min_backups: if is_basebackup_preserved(basebackups[0], now): - self.log.info("Not deleting more backups because %r still needs to preserved", basebackups[0]["name"]) + self.log.info("Not deleting more backups because %r still needs to be preserved", basebackups[0]["name"]) break # For age checks we treat the age as current_time - (backup_start_time + backup_interval). So when # backup interval is set to 24 hours a backup started 2.5 days ago would be considered to be 1.5 days old. @@ -539,31 +544,66 @@ def determine_backups_to_delete(self, *, basebackups, site_config): def refresh_backup_list_and_delete_old(self, site): """Look up basebackups from the object store, prune any extra - backups and return the datetime of the latest backup.""" - basebackups = self.get_remote_basebackups_info(site) - self.log.debug("Found %r basebackups", basebackups) + backups from the current and the extra backup sites and update + the state with the up-to-date backups list.""" + current_basebackups = self.get_remote_basebackups_info(site) + current_site_prefix = self._get_site_prefix(site) + for basebackup in current_basebackups: + basebackup["metadata"]["site-prefix"] = current_site_prefix + + # If `extra_backup_sites_prefixes` is set, let's also check those sites for backups that are due for cleanup. + extra_basebackups = [] + if self.config.get("extra_backup_sites_prefixes"): + new_extra_backup_sites_prefixes = [] + extra_backup_sites_prefixes = self.state["extra_backup_sites_prefixes"] + + for site_prefix in extra_backup_sites_prefixes: + extra_site_basebackups = self.get_remote_basebackups_info(site, site_prefix=site_prefix) + if not extra_site_basebackups: + continue + for basebackup in extra_site_basebackups: + basebackup["metadata"]["site-prefix"] = site_prefix + extra_basebackups.extend(extra_site_basebackups) + # We found some basebackups in this site, so let's include it in the next round of checks as well. + new_extra_backup_sites_prefixes.append(site_prefix) + + self.state["extra_backup_sites_prefixes"] = new_extra_backup_sites_prefixes + + extra_basebackups.sort(key=lambda entry: entry["metadata"]["start-time"]) + all_basebackups = extra_basebackups + current_basebackups + + self.log.debug("Found %r basebackups", all_basebackups) site_config = self.config["backup_sites"][site] # Never delete backups from a recovery site. This check is already elsewhere as well # but still check explicitly here to ensure we certainly won't delete anything unexpectedly if site_config["active"]: - basebackups_to_delete = self.determine_backups_to_delete(basebackups=basebackups, site_config=site_config) + basebackups_to_delete = self.determine_backups_to_delete(basebackups=all_basebackups, site_config=site_config) for basebackup_to_be_deleted in basebackups_to_delete: - pg_version_str = basebackup_to_be_deleted["metadata"].get("pg-version") - pg_version = None if pg_version_str is None else int(pg_version_str) - last_wal_segment_still_needed = 0 - if basebackups: - last_wal_segment_still_needed = basebackups[0]["metadata"]["start-wal-segment"] - - if last_wal_segment_still_needed: - # This is breaking concurrent PITR starting from the *previous* backup. - # That's why once a backup is preserved, we keep that backup and all the next ones. - self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version) - self.delete_remote_basebackup( - site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"], basebackups=basebackups - ) - self.state["backup_sites"][site]["basebackups"] = basebackups + metadata = basebackup_to_be_deleted["metadata"] + pg_version = metadata.get("pg-version") and int(metadata.get("pg-version")) + # When we delete a basebackup, let's also delete any WAL segments before its `start-wal-segment`. + self.delete_remote_wal_before(metadata["start-wal-segment"], site, metadata["site-prefix"], pg_version) + self.delete_remote_basebackup(site, basebackup_to_be_deleted["name"], metadata, basebackups=all_basebackups) + + # Delete WAL segments that are before `start-wal-segment` of the oldest still kept basebackup. + # This oldest kept basebackup could be from the current or from one of the extra sites when set. + oldest_wal_segment_to_keep = "" + if all_basebackups: + metadata = all_basebackups[0]["metadata"] + site_prefix = metadata["site-prefix"] + oldest_wal_segment_to_keep = metadata["start-wal-segment"] + pg_version = metadata.get("pg-version") and int(metadata.get("pg-version")) + + if oldest_wal_segment_to_keep: + # This is breaking concurrent PITR starting from the *previous* backup. + # That's why once a backup is preserved, we keep that backup and all the next ones. + self.delete_remote_wal_before(oldest_wal_segment_to_keep, site, site_prefix, pg_version) + + # Update pghoard state with the kept basebackups, but only the ones that are from the current site. + new_basebackups = [backup for backup in all_basebackups if backup["metadata"]["site-prefix"] == current_site_prefix] + self.state["backup_sites"][site]["basebackups"] = new_basebackups def get_normalized_backup_time(self, site_config, *, now=None): """Returns the closest historical backup time that current time matches to (or current time if it matches). @@ -589,6 +629,8 @@ def get_normalized_backup_time(self, site_config, *, now=None): def set_state_defaults(self, site): if site not in self.state["backup_sites"]: self.state["backup_sites"][site] = {"basebackups": []} + if "extra_backup_sites_prefixes" not in self.state: + self.state["extra_backup_sites_prefixes"] = self.config.get("extra_backup_sites_prefixes", []) def startup_walk_for_missed_files(self): """Check xlog and xlog_incoming directories for files that receivexlog has received but not yet diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 37ad3b46..79f96421 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -43,6 +43,10 @@ def setup_method(self, method): ], }, }, + "extra_backup_sites_prefixes": [ + f"extra_site_1_{self.test_site}", + f"extra_site_2_{self.test_site}", + ], }) config_path = os.path.join(self.temp_dir, "pghoard.json") write_json_file(config_path, self.config) @@ -117,6 +121,62 @@ def test_get_local_basebackups_info(self): assert basebackups[1]["name"] == "2015-07-02_10" assert basebackups[2]["name"] == "2015-07-03_0" + def test_get_local_basebackups_info_from_an_extra_site(self): + extra_site_prefix = f"extra_site_{self.test_site}" + extra_site_basebackup_storage_path = os.path.join( + self.config["backup_sites"][self.test_site]["object_storage"]["directory"], + extra_site_prefix, + "basebackup", + ) + os.makedirs(extra_site_basebackup_storage_path) + assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] + assert self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=extra_site_prefix) == [] + + # Handle the case where metadata file does not exist. + bb_path = os.path.join(extra_site_basebackup_storage_path, "2015-07-03_0") + with open(bb_path, "wb") as fp: + fp.write(b"something") + assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] + assert self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=extra_site_prefix) == [] + + metadata_file_path = bb_path + ".metadata" + with open(metadata_file_path, "w") as fp: + json.dump({"_hash": "abc", "start-time": "2015-07-03 12:00:00+00:00"}, fp) + + assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] + available_backup = self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=extra_site_prefix)[0] + assert available_backup["name"] == "2015-07-03_0" + start_time = datetime.datetime(2015, 7, 3, 12, tzinfo=datetime.timezone.utc) + assert available_backup["metadata"]["start-time"] == start_time + assert available_backup["metadata"]["backup-reason"] == BackupReason.scheduled + assert available_backup["metadata"]["normalized-backup-time"] is None + assert available_backup["metadata"]["backup-decision-time"] + + bb_path = os.path.join(extra_site_basebackup_storage_path, "2015-07-02_9") + metadata_file_path = bb_path + ".metadata" + with open(bb_path, "wb") as fp: + fp.write(b"something") + with open(metadata_file_path, "w") as fp: + json.dump({"_hash": "abc", "start-time": "2015-07-02 12:00:00+00:00"}, fp) + + assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=extra_site_prefix) + assert basebackups[0]["name"] == "2015-07-02_9" + assert basebackups[1]["name"] == "2015-07-03_0" + + bb_path = os.path.join(extra_site_basebackup_storage_path, "2015-07-02_10") + metadata_file_path = bb_path + ".metadata" + with open(bb_path, "wb") as fp: + fp.write(b"something") + with open(metadata_file_path, "w") as fp: + json.dump({"_hash": "abc", "start-time": "2015-07-02 22:00:00+00"}, fp) + + assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=extra_site_prefix) + assert basebackups[0]["name"] == "2015-07-02_9" + assert basebackups[1]["name"] == "2015-07-02_10" + assert basebackups[2]["name"] == "2015-07-03_0" + def test_determine_backups_to_delete(self): now = datetime.datetime.now(datetime.timezone.utc) bbs = [ @@ -341,15 +401,18 @@ def test_determine_backups_to_delete_with_preserve_when_older_than_max_age(self) assert to_delete == [bbs[0]] def test_local_refresh_backup_list_and_delete_old(self): - basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") - wal_storage_path = os.path.join(self.local_storage_dir, "xlog") - os.makedirs(basebackup_storage_path) - os.makedirs(wal_storage_path) - self.pghoard.set_state_defaults(self.test_site) assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] - def write_backup_and_wal_files(what): + base_local_storage_dir = self.config["backup_sites"][self.test_site]["object_storage"]["directory"] + os.makedirs(base_local_storage_dir) + + def write_backup_and_wal_files(what, site_dir): + basebackup_storage_path = os.path.join(base_local_storage_dir, site_dir, "basebackup") + wal_storage_path = os.path.join(base_local_storage_dir, site_dir, "xlog") + os.makedirs(basebackup_storage_path, exist_ok=True) + os.makedirs(wal_storage_path, exist_ok=True) + for bb, wals in what.items(): if bb: bb_path = os.path.join(basebackup_storage_path, bb) @@ -388,13 +451,84 @@ def write_backup_and_wal_files(what): "000000040000000B00000004", ], } - write_backup_and_wal_files(backups_and_wals) + + extra_site_1_backups_and_wals = { + "2015-08-23_0": [ + # NOTE: gap between this and next segment means that cleanup shouldn't find this + "000000010000000A000000DE", + ], + "2015-08-23_1": [ + "000000010000000A000000E0", + "000000010000000A000000E1", + ], + "2015-08-23_2": [ + "000000010000000A000000E2", + "000000010000000A000000E3", + ], + } + + extra_site_2_backups_and_wals = { + "2015-08-24_0": [ + "000000010000000A000000E7", + ], + "2015-08-24_1": [ + "000000010000000A000000E8", + "000000010000000A000000E9", + ], + "2015-08-24_2": [ + "000000010000000A000000EA", + "000000010000000A000000EB", + "000000010000000A000000EC", + "000000010000000A000000ED", + ], + } + + extra_site_1 = f"extra_site_1_{self.test_site}" + extra_site_2 = f"extra_site_2_{self.test_site}" + write_backup_and_wal_files(backups_and_wals, self.test_site) + write_backup_and_wal_files(extra_site_1_backups_and_wals, extra_site_1) + write_backup_and_wal_files(extra_site_2_backups_and_wals, extra_site_2) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) assert len(basebackups) == 4 + assert {backup["name"] for backup in basebackups} == backups_and_wals.keys() + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) assert len(basebackups) == 1 + assert basebackups[0]["name"] == "2015-08-25_3" + basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") + assert set(os.listdir(basebackup_storage_path)) == {"2015-08-25_3", "2015-08-25_3.metadata"} + wal_storage_path = os.path.join(self.local_storage_dir, "xlog") assert len(os.listdir(wal_storage_path)) == 3 + assert set(os.listdir(wal_storage_path)) == { + "000000010000000A000000FB", + "000000040000000B00000003", + "000000040000000B00000004", + } + + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=f"extra_site_1_{self.test_site}") + assert len(basebackups) == 0 + extra_site_1_wal_storage_path = os.path.join(base_local_storage_dir, extra_site_1, "xlog") + assert len(os.listdir(extra_site_1_wal_storage_path)) == 3 + assert set(os.listdir(extra_site_1_wal_storage_path)) == { + "000000010000000A000000DE", + "000000010000000A000000E2", + "000000010000000A000000E3", + } + + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site, site_prefix=f"extra_site_2_{self.test_site}") + assert len(basebackups) == 0 + extra_site_2_wal_storage_path = os.path.join(base_local_storage_dir, extra_site_2, "xlog") + assert len(os.listdir(extra_site_2_wal_storage_path)) == 4 + assert set(os.listdir(extra_site_2_wal_storage_path)) == { + "000000010000000A000000EA", + "000000010000000A000000EB", + "000000010000000A000000EC", + "000000010000000A000000ED", + } + # Put all WAL segments between 1 and 9 in place to see that they're deleted and we don't try to go back # any further from TLI 1. Note that timeline 3 is now "empty" so deletion shouldn't touch timelines 2 # or 1. @@ -412,15 +546,20 @@ def write_backup_and_wal_files(what): "000000040000000B00000005", ], } - write_backup_and_wal_files(new_backups_and_wals) + write_backup_and_wal_files(new_backups_and_wals, self.local_storage_dir) assert len(os.listdir(wal_storage_path)) == 11 + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) assert len(basebackups) == 1 - expected_wal_count = len(backups_and_wals["2015-08-25_0"]) - expected_wal_count += len(new_backups_and_wals[""]) - expected_wal_count += len(new_backups_and_wals["2015-08-25_4"]) - assert len(os.listdir(wal_storage_path)) == expected_wal_count + assert set(os.listdir(basebackup_storage_path)) == {"2015-08-25_4", "2015-08-25_4.metadata"} + expected_wal_segments = ( + backups_and_wals["2015-08-25_0"] + new_backups_and_wals[""] + new_backups_and_wals["2015-08-25_4"] + ) + assert len(expected_wal_segments) == 9 + assert set(os.listdir(wal_storage_path)) == set(expected_wal_segments) + # Now put WAL files in place with no gaps anywhere gapless_backups_and_wals = { "2015-08-25_3": [ @@ -431,12 +570,16 @@ def write_backup_and_wal_files(what): "000000040000000B00000005", ], } - write_backup_and_wal_files(gapless_backups_and_wals) - assert len(os.listdir(wal_storage_path)) >= 10 + write_backup_and_wal_files(gapless_backups_and_wals, self.local_storage_dir) + assert len(os.listdir(wal_storage_path)) == 11 + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) assert len(basebackups) == 1 + assert set(os.listdir(basebackup_storage_path)) == {"2015-08-25_4", "2015-08-25_4.metadata"} assert len(os.listdir(wal_storage_path)) == 1 + assert set(os.listdir(wal_storage_path)) == {"000000040000000B00000005"} def test_local_refresh_backup_list_and_delete_old_delta_format(self): basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") @@ -555,25 +698,29 @@ def test_get_delta_basebackup_files(self, backup_to_delete: str, backup_to_delet { "name": "bb_v1", "metadata": { - "format": BaseBackupFormat.v1 + "format": BaseBackupFormat.v1, + "site-prefix": self.test_site, }, }, { "name": "bb_v2", "metadata": { - "format": BaseBackupFormat.v2 + "format": BaseBackupFormat.v2, + "site-prefix": self.test_site, }, }, { "name": "delta_v1", "metadata": { - "format": BaseBackupFormat.delta_v1 + "format": BaseBackupFormat.delta_v1, + "site-prefix": self.test_site, }, }, { "name": "delta_v2", "metadata": { - "format": BaseBackupFormat.delta_v2 + "format": BaseBackupFormat.delta_v2, + "site-prefix": self.test_site, }, }, ] @@ -645,7 +792,7 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d self.pghoard._get_delta_basebackup_files( # pylint: disable=protected-access site=self.test_site, storage=Mock(), - metadata=backup_to_delete_meta, + metadata={**backup_to_delete_meta, "site-prefix": self.test_site}, basebackup_name_to_delete=backup_to_delete, backups_to_keep=backups_to_keep ) @@ -654,7 +801,7 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d res = self.pghoard._get_delta_basebackup_files( # pylint: disable=protected-access site=self.test_site, storage=Mock(), - metadata=backup_to_delete_meta, + metadata={**backup_to_delete_meta, "site-prefix": self.test_site}, basebackup_name_to_delete=backup_to_delete, backups_to_keep=backups_to_keep )