Skip to content

Commit

Permalink
Support cleaning up extra backup sites
Browse files Browse the repository at this point in the history
This adds a new setting called `extra_backup_sites_prefixes`, which when
set, makes cleaning process look into those extra sites as well, and
this is particularly useful when we have old backups from a previous
backup site after a PG major version upgrade.
  • Loading branch information
ettanany committed Jun 12, 2024
1 parent a1da446 commit b9980a9
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ disable=

[FORMAT]
max-line-length=125
max-module-lines=1100
max-module-lines=1200

[REPORTS]
output-format=text
Expand Down
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,11 @@ How many threads to use for tar, compress and encrypt tasks. Only applies for
this, with higher thread count speed improvement is negligible and CPU time is
lost switching between threads.

``extra_backup_sites_prefixes`` (default undefined)

Prefixes for extra backup sites to look into during cleanup. This could be useful
if you want to removed old backups from previous backup sites after a PG upgrade.

``encryption_key_id`` (no default)

Specifies the encryption key used when storing encrypted backups. If this
Expand Down
97 changes: 70 additions & 27 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def create_backup_site_paths(self, site: str) -> BackupSitePaths:

return backup_site_paths

def delete_remote_wal_before(self, wal_segment, site, pg_version):
def delete_remote_wal_before(self, wal_segment, site, site_prefix, pg_version):
self.log.info("Starting WAL deletion from: %r before: %r, pg_version: %r", site, wal_segment, pg_version)
storage = self.site_transfers.get(site)
valid_timeline = True
Expand All @@ -334,7 +334,7 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version):
lsn = lsn.previous_walfile_start_lsn
if lsn is None:
break
wal_path = os.path.join(self._get_site_prefix(site), "xlog", lsn.walfile_name)
wal_path = os.path.join(site_prefix, "xlog", lsn.walfile_name)
self.log.debug("Deleting wal_file: %r", wal_path)
try:
storage.delete_key(wal_path)
Expand All @@ -360,8 +360,8 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version):
def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_to_delete, backups_to_keep) -> List:
delta_formats = (BaseBackupFormat.delta_v1, BaseBackupFormat.delta_v2)
assert metadata["format"] in delta_formats
all_hexdigests = set()
keep_hexdigests = set()
all_hexdigests: Dict[str, str] = {}
keep_hexdigests: Dict[str, str] = {}

basebackup_data_files = list()
delta_backup_names = {
Expand All @@ -372,7 +372,7 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t
delta_backup_names[basebackup_name_to_delete] = metadata

for backup_name, backup_metadata in delta_backup_names.items():
delta_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", backup_name)
delta_backup_key = os.path.join(backup_metadata["site-prefix"], "basebackup", backup_name)
meta, _ = download_backup_meta_file(
storage=storage,
basebackup_path=delta_backup_key,
Expand All @@ -385,18 +385,22 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t
backup_state = snapshot_result["state"]
files = backup_state["files"]

backup_hexdigests = set(delta_file["hexdigest"] for delta_file in files if delta_file["hexdigest"])
all_hexdigests |= backup_hexdigests
backup_hexdigests = {
delta_file["hexdigest"]: backup_metadata["site-prefix"]
for delta_file in files
if delta_file["hexdigest"]
}
all_hexdigests.update(backup_hexdigests)

if backup_name != basebackup_name_to_delete:
# Keep data file in case if there is still a reference from other backups
keep_hexdigests |= backup_hexdigests
keep_hexdigests.update(backup_hexdigests)
else:
# Add bundles to remove
for chunk in meta.get("chunks", []):
basebackup_data_files.append(
os.path.join(
self._get_site_prefix(site),
backup_metadata["site-prefix"],
FileTypePrefixes[FileType.Basebackup_delta_chunk],
chunk["chunk_filename"],
)
Expand All @@ -405,16 +409,21 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t
# Remove unreferenced files
extra_hexdigests = set(all_hexdigests).difference(keep_hexdigests)
for hexdigest in extra_hexdigests:
basebackup_data_files.append(os.path.join(self._get_site_prefix(site), "basebackup_delta", hexdigest))
basebackup_data_files.append(os.path.join(all_hexdigests[hexdigest], "basebackup_delta", hexdigest))

return basebackup_data_files

def delete_remote_basebackup(self, site, basebackup, metadata, basebackups):
start_time = time.monotonic()
storage = self.site_transfers.get(site)
main_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", basebackup)
main_backup_key = os.path.join(metadata["site-prefix"], "basebackup", basebackup)
basebackup_data_files = [main_backup_key]

# When we delete a basebackup, let's also delete any WAL segments before its `start-wal-segment`.
pg_version_str = metadata.get("pg-version")
pg_version = None if pg_version_str is None else int(pg_version_str)
self.delete_remote_wal_before(metadata["start-wal-segment"], site, metadata["site-prefix"], pg_version)

if metadata.get("format") == BaseBackupFormat.v2:
bmeta_compressed = storage.get_contents_to_string(main_backup_key)[0]
with rohmufile.file_reader(
Expand All @@ -427,7 +436,7 @@ def delete_remote_basebackup(self, site, basebackup, metadata, basebackups):
for chunk in bmeta["chunks"]:
basebackup_data_files.append(
os.path.join(
self._get_site_prefix(site),
metadata["site-prefix"],
"basebackup_chunk",
chunk["chunk_filename"],
)
Expand Down Expand Up @@ -457,14 +466,15 @@ def get_or_create_site_storage(self, site):
self.site_transfers[site] = storage
return storage

def get_remote_basebackups_info(self, site):
def get_remote_basebackups_info(self, site, site_prefix=None):
storage = self.get_or_create_site_storage(site=site)
site_config = self.config["backup_sites"][site]
results = storage.list_path(os.path.join(site_config["prefix"], "basebackup"))
site_prefix = site_prefix or site_config["prefix"]
results = storage.list_path(os.path.join(site_prefix, "basebackup"))
for entry in results:
self.patch_basebackup_info(entry=entry, site_config=site_config)

preservation_requests = storage.list_path(os.path.join(site_config["prefix"], "preservation_request"))
preservation_requests = storage.list_path(os.path.join(site_prefix, "preservation_request"))
backups_to_preserve = parse_preservation_requests(preservation_requests)
for entry in results:
patch_basebackup_metadata_with_preservation(entry, backups_to_preserve)
Expand Down Expand Up @@ -517,7 +527,7 @@ def determine_backups_to_delete(self, *, basebackups, site_config):
if max_age_days and min_backups > 0:
while basebackups and len(basebackups) > min_backups:
if is_basebackup_preserved(basebackups[0], now):
self.log.info("Not deleting more backups because %r still needs to preserved", basebackups[0]["name"])
self.log.info("Not deleting more backups because %r still needs to be preserved", basebackups[0]["name"])
break
# For age checks we treat the age as current_time - (backup_start_time + backup_interval). So when
# backup interval is set to 24 hours a backup started 2.5 days ago would be considered to be 1.5 days old.
Expand All @@ -539,31 +549,62 @@ def determine_backups_to_delete(self, *, basebackups, site_config):

def refresh_backup_list_and_delete_old(self, site):
"""Look up basebackups from the object store, prune any extra
backups and return the datetime of the latest backup."""
basebackups = self.get_remote_basebackups_info(site)
self.log.debug("Found %r basebackups", basebackups)
backups from the current and the extra backup sites and update
the state with the up-to-date backups list."""
current_basebackups = self.get_remote_basebackups_info(site)
current_site_prefix = self._get_site_prefix(site)
for basebackup in current_basebackups:
basebackup["metadata"]["site-prefix"] = current_site_prefix

# If `extra_backup_sites_prefixes` is set, let's also check those sites for backups that are due for cleanup.
extra_basebackups = []
if self.config.get("extra_backup_sites_prefixes"):
new_extra_backup_sites_prefixes = []
extra_backup_sites_prefixes = self.state["extra_backup_sites_prefixes"]

for site_prefix in extra_backup_sites_prefixes:
extra_site_basebackups = self.get_remote_basebackups_info(site, site_prefix=site_prefix)
if not extra_site_basebackups:
continue
for basebackup in extra_site_basebackups:
basebackup["metadata"]["site-prefix"] = site_prefix
extra_basebackups.extend(extra_site_basebackups)
# If we did not find any backup in this extra site, we do not include it in the next round of checks.
new_extra_backup_sites_prefixes.append(site_prefix)

self.state["extra_backup_sites_prefixes"] = new_extra_backup_sites_prefixes

extra_basebackups.sort(key=lambda entry: entry["metadata"]["start-time"])
all_basebackups = extra_basebackups + current_basebackups

self.log.debug("Found %r basebackups", all_basebackups)

site_config = self.config["backup_sites"][site]
# Never delete backups from a recovery site. This check is already elsewhere as well
# but still check explicitly here to ensure we certainly won't delete anything unexpectedly
if site_config["active"]:
basebackups_to_delete = self.determine_backups_to_delete(basebackups=basebackups, site_config=site_config)
basebackups_to_delete = self.determine_backups_to_delete(basebackups=all_basebackups, site_config=site_config)

for basebackup_to_be_deleted in basebackups_to_delete:
pg_version_str = basebackup_to_be_deleted["metadata"].get("pg-version")
pg_version = None if pg_version_str is None else int(pg_version_str)
last_wal_segment_still_needed = 0
if basebackups:
last_wal_segment_still_needed = basebackups[0]["metadata"]["start-wal-segment"]
oldest_wal_segment_to_keep = ""
if all_basebackups:
oldest_wal_segment_to_keep = all_basebackups[0]["metadata"]["start-wal-segment"]

if last_wal_segment_still_needed:
if oldest_wal_segment_to_keep:
# This is breaking concurrent PITR starting from the *previous* backup.
# That's why once a backup is preserved, we keep that backup and all the next ones.
self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version)
self.delete_remote_wal_before(
oldest_wal_segment_to_keep, site, basebackup_to_be_deleted["metadata"]["site-prefix"], pg_version
)
self.delete_remote_basebackup(
site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"], basebackups=basebackups
site,
basebackup_to_be_deleted["name"],
basebackup_to_be_deleted["metadata"],
basebackups=all_basebackups
)
self.state["backup_sites"][site]["basebackups"] = basebackups
self.state["backup_sites"][site]["basebackups"] = current_basebackups

def get_normalized_backup_time(self, site_config, *, now=None):
"""Returns the closest historical backup time that current time matches to (or current time if it matches).
Expand All @@ -589,6 +630,8 @@ def get_normalized_backup_time(self, site_config, *, now=None):
def set_state_defaults(self, site):
if site not in self.state["backup_sites"]:
self.state["backup_sites"][site] = {"basebackups": []}
if "extra_backup_sites_prefixes" not in self.state:
self.state["extra_backup_sites_prefixes"] = self.config.get("extra_backup_sites_prefixes", [])

def startup_walk_for_missed_files(self):
"""Check xlog and xlog_incoming directories for files that receivexlog has received but not yet
Expand Down
Loading

0 comments on commit b9980a9

Please sign in to comment.