diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 260bd850..91c639b3 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -16,6 +16,7 @@ import tarfile import time from contextlib import suppress +from dataclasses import dataclass from pathlib import Path from tempfile import NamedTemporaryFile from typing import Any, Dict, Optional, Tuple @@ -52,6 +53,13 @@ ] +@dataclass +class BackupStopResult: + lsn: str + backup_label: str + backup_end_time: datetime.datetime + + class PGBaseBackup(PGHoardThread): def __init__( self, @@ -670,11 +678,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = with open(os.path.join(pgdata, "global", "pg_control"), "rb") as fp: pg_control = fp.read() - if self.pg_version_server >= 150000: - cursor.execute("SELECT labelfile FROM pg_backup_stop()") - else: - cursor.execute("SELECT labelfile FROM pg_stop_backup(false)") - backup_label = cursor.fetchone()[0] # type: ignore + backup_stop_result = self._stop_backup(cursor) db_conn.commit() backup_stopped = True @@ -690,13 +694,13 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = finally: db_conn.rollback() if not backup_stopped: - self._stop_backup(cursor) + _ = self._stop_backup(cursor) db_conn.commit() - assert backup_label - backup_label_data = backup_label.encode("utf-8") + assert backup_stop_result + backup_label_data = backup_stop_result.backup_label.encode("utf-8") backup_start_wal_segment, backup_start_time = self.parse_backup_label(backup_label_data) - backup_end_wal_segment, backup_end_time = self.get_backup_end_segment_and_time(db_conn) + backup_end_wal_segment, backup_end_time = self.get_backup_end_segment_and_time(backup_stop_result, db_conn) # Generate and upload the metadata chunk metadata = { @@ -769,7 +773,7 @@ def find_and_split_files_to_backup(self, *, pgdata, tablespaces, target_chunk_si chunks.append(one_chunk_files) return total_file_count, chunks - def get_backup_end_segment_and_time(self, db_conn): + def get_backup_end_segment_and_time(self, backup_stop_result: BackupStopResult, db_conn): """Grab a timestamp and WAL segment name after the end of the backup: this is a point in time to which we must be able to recover to, and the last WAL segment that is required for the backup to be consistent. @@ -792,6 +796,23 @@ def get_backup_end_segment_and_time(self, db_conn): db_conn.commit() return None, backup_end_time + if self.pg_version_server >= 170000: + # In PostgreSQL versions prior to 17, the `pg_walfile_name` function could return the previous WAL file + # if the provided LSN fell at the boundary between WAL segments (e.g., at the start of the next WAL file). + # This could lead to inconsistencies in the backup metadata, as the LSN obtained from `pg_backup_stop` + # might not correspond to the actual WAL file that contains the backup's end LSN. + # + # To handle this, we had to fetch the current LSN after stopping the backup and use it to determine the + # correct WAL file name, ensuring the backup's end segment was accurately identified. + # + # However, in PostgreSQL 17 and later, this issue is resolved. `pg_walfile_name` now always returns + # the current WAL file, regardless of where the LSN falls within the segment. As a result, the extra step + # of manually fetching the LSN after `pg_backup_stop` is no longer necessary— the output from + # `pg_backup_stop` is now accurate and can be relied upon directly. + + cursor.execute(f"SELECT pg_walfile_name('{backup_stop_result.lsn}')") + return cursor.fetchone()[0], backup_stop_result.backup_end_time + if self.pg_version_server >= 100000: cursor.execute("SELECT pg_walfile_name(pg_current_wal_lsn()), txid_current()") else: @@ -820,8 +841,12 @@ def _start_backup(self, cursor: psycopg2.extensions.cursor, basebackup_name: str else: cursor.execute("SELECT pg_start_backup(%s, true, false)", [basebackup_name]) - def _stop_backup(self, cursor: psycopg2.extensions.cursor) -> None: - if self.pg_version_server >= 150000: - cursor.execute("SELECT pg_backup_stop()") - else: - cursor.execute("SELECT pg_stop_backup(false)") + def _stop_backup(self, cursor: psycopg2.extensions.cursor) -> BackupStopResult: + stop_backup_func = "pg_backup_stop()" if self.pg_version_server >= 150000 else "pg_stop_backup(false)" + cursor.execute(f"SELECT lsn, labelfile, now() FROM {stop_backup_func}") + result = cursor.fetchone() + + if not isinstance(result, tuple): # mypy: help + raise ValueError("No rows returned after trying to stop backup.") + + return BackupStopResult(*result) diff --git a/pghoard/wal.py b/pghoard/wal.py index 58f55ede..8e850a80 100644 --- a/pghoard/wal.py +++ b/pghoard/wal.py @@ -32,6 +32,7 @@ 0xD10D: 140000, 0xD110: 150000, 0xD113: 160000, + 0xD116: 170000, } WAL_MAGIC_BY_VERSION = {value: key for key, value in WAL_MAGIC.items()} diff --git a/test/basebackup/test_basebackup.py b/test/basebackup/test_basebackup.py index 961e524d..07df904e 100644 --- a/test/basebackup/test_basebackup.py +++ b/test/basebackup/test_basebackup.py @@ -946,3 +946,52 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d "7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e27": 8192, "7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e28": 800 } + + def test_backup_end_segment_and_time(self, db, pg_version: str): + conn = db.connect() + conn.autocommit = True + cursor = conn.cursor() + + # Generate enough activity to ensure we approach a segment boundary (16 MB) + cursor.execute("DROP TABLE IF EXISTS wal_test;") + cursor.execute("CREATE TABLE wal_test (id serial, data text);") + for _ in range(1024): + cursor.execute("INSERT INTO wal_test (data) SELECT 'x' || repeat('y', 1024) FROM generate_series(1, 1024);") + cursor.execute("CHECKPOINT;") + + # basic PGBaseBackup, just use backup_end_segment_and_time for noticing discrepancies among versions + # when LSN falls on a boundary + + pgb = PGBaseBackup( + config=SIMPLE_BACKUP_CONFIG, + site="foosite", + connection_info=None, + basebackup_path=None, + compression_queue=None, + storage=None, + transfer_queue=None, + metrics=metrics.Metrics(statsd={}), + pg_version_server=int(pg_version) * 10000, + ) + + basebackup_name = "test_backup" + + # Manually start and stop backup (could be better, but just need to test basic behavior) + # pylint: disable=protected-access + pgb._start_backup(cursor, basebackup_name) + stop_backup_result = pgb._stop_backup(cursor) + + expected_segment = "000000010000000000000047" + + # for >=PG17 must rely on stop_backup_result LSN, not on current_lsn. For demo this, + # will change pg_version_server to PG16 and show what will happen if fix in pg_walfile_name is not considered + if int(pg_version) >= 17: + pgb.pg_version_server = 160000 + result_end_segment, _ = pgb.get_backup_end_segment_and_time(stop_backup_result, conn) + assert result_end_segment == "000000010000000000000048" # relying on steps for