Skip to content

Commit

Permalink
Add support for PG17
Browse files Browse the repository at this point in the history
In PostgreSQL versions prior to 17, the `pg_walfile_name` function could
return the previous WAL file if the provided LSN fell at the boundary between WAL segments.
This was fixed in PG17, and now gets the actual wal file name instead. This change affects
the way we are retrieving the backup_end_wal_segment for the backups metadata.
Therefore, instead of relying on our old behavior, we are now using the LSN yield by
pg_backup_stop.
  • Loading branch information
kathia-barahona committed Nov 19, 2024
1 parent 69d1115 commit 976c201
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
# Setup build deps
sudo apt-get install -y libsnappy-dev postgresql-10 postgresql-11 postgresql-12 postgresql-13 postgresql-14 postgresql-15 postgresql-16
sudo apt-get install -y libsnappy-dev postgresql-12 postgresql-13 postgresql-14 postgresql-15 postgresql-16 postgresql-17
# Setup common python dependencies
python -m pip install --upgrade pip
pip install .
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ python 3.10, 3.11 and 3.12 virtual environments and installations of postgresql
By default vagrant up will start a Virtualbox environment. The Vagrantfile will also work for libvirt, just prefix
``VAGRANT_DEFAULT_PROVIDER=libvirt`` to the ``vagrant up`` command.

Any combination of Python (3.10, 3.11 and 3.12) and Postgresql (12, 13, 14, 15 and 16)
Any combination of Python (3.10, 3.11 and 3.12) and Postgresql (12, 13, 14, 15, 16 and 17)

Bring up vagrant instance and connect via ssh::

Expand Down
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Vagrant.configure("2") do |config|
sed -i "s/^#create_main_cluster.*/create_main_cluster=false/g" /etc/postgresql-common/createcluster.conf
apt-get install -y python{3.10,3.11,3.12} python{3.10,3.11,3.12}-dev python{3.10,3.11,3.12}-venv
apt-get install -y postgresql-{12,13,14,15,16} postgresql-server-dev-{12,13,14,15,16}
apt-get install -y postgresql-{12,13,14,15,16,17} postgresql-server-dev-{12,13,14,15,16,17}
username="$(< /dev/urandom tr -dc a-z | head -c${1:-32};echo;)"
password=$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c${1:-32};echo;)
Expand Down
4 changes: 2 additions & 2 deletions docs/development.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ Vagrant
=======

The Vagrantfile can be used to setup a vagrant development environment. The vagrant environment has
python 3.10, 3.11 and 3.12 virtual environments and installations of postgresql 12, 13, 14, 15 and 16.
python 3.10, 3.11 and 3.12 virtual environments and installations of postgresql 12, 13, 14, 15, 16 and 17.

By default vagrant up will start a Virtualbox environment. The Vagrantfile will also work for libvirt, just prefix
``VAGRANT_DEFAULT_PROVIDER=libvirt`` to the ``vagrant up`` command.

Any combination of Python (3.10, 3.11 and 3.12) and Postgresql (12, 13, 14, 15 and 16)
Any combination of Python (3.10, 3.11 and 3.12) and Postgresql (12, 13, 14, 15, 16 and 17)

Bring up vagrant instance and connect via ssh::

Expand Down
55 changes: 40 additions & 15 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tarfile
import time
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional, Tuple
Expand Down Expand Up @@ -52,6 +53,13 @@
]


@dataclass
class BackupStopResult:
lsn: str
backup_label: str
backup_end_time: datetime.datetime


class PGBaseBackup(PGHoardThread):
def __init__(
self,
Expand Down Expand Up @@ -670,11 +678,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
with open(os.path.join(pgdata, "global", "pg_control"), "rb") as fp:
pg_control = fp.read()

if self.pg_version_server >= 150000:
cursor.execute("SELECT labelfile FROM pg_backup_stop()")
else:
cursor.execute("SELECT labelfile FROM pg_stop_backup(false)")
backup_label = cursor.fetchone()[0] # type: ignore
backup_stop_result = self._stop_backup(cursor)
db_conn.commit()
backup_stopped = True

Expand All @@ -690,13 +694,13 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
finally:
db_conn.rollback()
if not backup_stopped:
self._stop_backup(cursor)
_ = self._stop_backup(cursor)
db_conn.commit()

assert backup_label
backup_label_data = backup_label.encode("utf-8")
assert backup_stop_result
backup_label_data = backup_stop_result.backup_label.encode("utf-8")
backup_start_wal_segment, backup_start_time = self.parse_backup_label(backup_label_data)
backup_end_wal_segment, backup_end_time = self.get_backup_end_segment_and_time(db_conn)
backup_end_wal_segment, backup_end_time = self.get_backup_end_segment_and_time(backup_stop_result, db_conn)

# Generate and upload the metadata chunk
metadata = {
Expand Down Expand Up @@ -769,7 +773,7 @@ def find_and_split_files_to_backup(self, *, pgdata, tablespaces, target_chunk_si
chunks.append(one_chunk_files)
return total_file_count, chunks

def get_backup_end_segment_and_time(self, db_conn):
def get_backup_end_segment_and_time(self, backup_stop_result: BackupStopResult, db_conn):
"""Grab a timestamp and WAL segment name after the end of the backup: this is a point in time to which
we must be able to recover to, and the last WAL segment that is required for the backup to be
consistent.
Expand All @@ -792,6 +796,23 @@ def get_backup_end_segment_and_time(self, db_conn):
db_conn.commit()
return None, backup_end_time

if self.pg_version_server >= 170000:
# In PostgreSQL versions prior to 17, the `pg_walfile_name` function could return the previous WAL file
# if the provided LSN fell at the boundary between WAL segments (e.g., at the start of the next WAL file).
# This could lead to inconsistencies in the backup metadata, as the LSN obtained from `pg_backup_stop`
# might not correspond to the actual WAL file that contains the backup's end LSN.
#
# To handle this, we had to fetch the current LSN after stopping the backup and use it to determine the
# correct WAL file name, ensuring the backup's end segment was accurately identified.
#
# However, in PostgreSQL 17 and later, this issue is resolved. `pg_walfile_name` now always returns
# the current WAL file, regardless of where the LSN falls within the segment. As a result, the extra step
# of manually fetching the LSN after `pg_backup_stop` is no longer necessary— the output from
# `pg_backup_stop` is now accurate and can be relied upon directly.

cursor.execute(f"SELECT pg_walfile_name('{backup_stop_result.lsn}')")
return cursor.fetchone()[0], backup_stop_result.backup_end_time

if self.pg_version_server >= 100000:
cursor.execute("SELECT pg_walfile_name(pg_current_wal_lsn()), txid_current()")
else:
Expand Down Expand Up @@ -820,8 +841,12 @@ def _start_backup(self, cursor: psycopg2.extensions.cursor, basebackup_name: str
else:
cursor.execute("SELECT pg_start_backup(%s, true, false)", [basebackup_name])

def _stop_backup(self, cursor: psycopg2.extensions.cursor) -> None:
if self.pg_version_server >= 150000:
cursor.execute("SELECT pg_backup_stop()")
else:
cursor.execute("SELECT pg_stop_backup(false)")
def _stop_backup(self, cursor: psycopg2.extensions.cursor) -> BackupStopResult:
stop_backup_func = "pg_backup_stop()" if self.pg_version_server >= 150000 else "pg_stop_backup(false)"
cursor.execute(f"SELECT lsn, labelfile, now() FROM {stop_backup_func}")
result = cursor.fetchone()

if not isinstance(result, tuple): # mypy: help
raise ValueError("No rows returned after trying to stop backup.")

return BackupStopResult(*result)
2 changes: 1 addition & 1 deletion pghoard/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pghoard.common import (extract_pg_command_version_string, pg_major_version, pg_version_string_to_number)
from pghoard.postgres_command import PGHOARD_HOST, PGHOARD_PORT

SUPPORTED_VERSIONS = ["16", "15", "14", "13", "12", "11", "10", "9.6", "9.5", "9.4", "9.3"]
SUPPORTED_VERSIONS = ["17", "16", "15", "14", "13", "12", "11", "10", "9.6", "9.5", "9.4", "9.3"]


def get_cpu_count():
Expand Down
1 change: 1 addition & 0 deletions pghoard/wal.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
0xD10D: 140000,
0xD110: 150000,
0xD113: 160000,
0xD116: 170000,
}
WAL_MAGIC_BY_VERSION = {value: key for key, value in WAL_MAGIC.items()}

Expand Down
66 changes: 66 additions & 0 deletions test/basebackup/test_basebackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,3 +946,69 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d
"7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e27": 8192,
"7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e28": 800
}

def test_backup_end_segment_and_time(self, db, pg_version: str):
conn = db.connect()
conn.autocommit = True
cursor = conn.cursor()

# Force a WAL switch to guarantee that the next LSN is at the start of a new WAL segment
cursor.execute("SELECT pg_switch_wal();")

# Generate enough activity to ensure we approach a segment boundary (16 MB)
cursor.execute("DROP TABLE IF EXISTS wal_test;")
cursor.execute("CREATE TABLE wal_test (id serial, data text);")
for _ in range(1024):
cursor.execute("INSERT INTO wal_test (data) SELECT 'x' || repeat('y', 1024) FROM generate_series(1, 1024);")
cursor.execute("CHECKPOINT;")
cursor.execute("SELECT pg_switch_wal();")

# basic PGBaseBackup, just use backup_end_segment_and_time for noticing discrepancies among versions
# when LSN falls on a boundary

pgb = PGBaseBackup(
config=SIMPLE_BACKUP_CONFIG,
site="foosite",
connection_info=None,
basebackup_path=None,
compression_queue=None,
storage=None,
transfer_queue=None,
metrics=metrics.Metrics(statsd={}),
pg_version_server=int(pg_version) * 10000,
)

basebackup_name = "test_backup"

# Manually start and stop backup (could be better, but just need to test basic behavior)
# pylint: disable=protected-access
pgb._start_backup(cursor, basebackup_name)
stop_backup_result = pgb._stop_backup(cursor)

expected_segment = self._calculate_wal_segment(lsn=stop_backup_result.lsn)

# for >=PG17 must rely on stop_backup_result LSN, not on current_lsn. For demo this,
# will change pg_version_server to PG16 and show what will happen if fix in pg_walfile_name is not considered
if int(pg_version) >= 17:
pgb.pg_version_server = 160000
result_end_segment, _ = pgb.get_backup_end_segment_and_time(stop_backup_result, conn)
assert result_end_segment != expected_segment # relying on steps for <PG17, is not accurate.

# set back and test again
pgb.pg_version_server = int(pg_version) * 10000

result_end_segment, _ = pgb.get_backup_end_segment_and_time(stop_backup_result, conn)
assert result_end_segment == expected_segment

def _calculate_wal_segment(self, lsn: str, timeline: str = "00000001") -> str:
# Extract XLog ID and Offset from LSN (format: XLogID/Offset)
xlog_id, xlog_offset = map(lambda x: int(x, 16), lsn.split("/"))

# Calculate the WAL segment number
segment_number = (xlog_id << 32 | xlog_offset) // 0x01000000

# Format the segment number into XLog ID and Offset
xlog_id_formatted = f"{segment_number >> 32:08X}"
xlog_offset_formatted = f"{segment_number & 0xFFFFFFFF:08X}"

return f"{timeline}{xlog_id_formatted}{xlog_offset_formatted}"
2 changes: 1 addition & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

logutil.configure_logging()

DEFAULT_PG_VERSIONS = ["16", "15", "14", "13", "12"]
DEFAULT_PG_VERSIONS = ["17", "16", "15", "14", "13", "12"]


def port_is_listening(hostname: str, port: int, timeout: float = 0.5) -> bool:
Expand Down
14 changes: 9 additions & 5 deletions test/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def _switch_wal(self, db, count):
conn.close()
return start_wal, end_wal

def test_archive_sync(self, db, pghoard):
def test_archive_sync(self, db, pghoard, pg_version: str):
log = logging.getLogger("test_archive_sync")
store = pghoard.transfer_agents[0].get_object_storage(pghoard.test_site)

Expand Down Expand Up @@ -273,7 +273,11 @@ def list_archive(folder):
# cluster between all tests)
pg_wal_dir = get_pg_wal_directory(pghoard.config["backup_sites"][pghoard.test_site])
pg_wals = {f for f in os.listdir(pg_wal_dir) if wal.WAL_RE.match(f) and f > start_wal}
assert len(pg_wals) >= 4

# consider changes in pg_walfile_name, as pg_walfile_name(pg_current_wal_lsn()) might return
# previous walfile name and not current.
expected_min_wals = 4 if int(pg_version) < 17 else 3
assert len(pg_wals) >= expected_min_wals

# create a couple of "recycled" xlog files that we must ignore
last_wal = sorted(pg_wals)[-1]
Expand All @@ -291,7 +295,7 @@ def write_dummy_wal(inc):
# check what we have archived, there should be at least the three
# above WALs that are NOT there at the moment
archived_wals = set(list_archive("xlog"))
assert len(pg_wals - archived_wals) >= 4
assert len(pg_wals - archived_wals) >= expected_min_wals
# now perform an archive sync
arsy = ArchiveSync()
arsy.run(["--site", pghoard.test_site, "--config", pghoard.config_path])
Expand Down Expand Up @@ -329,7 +333,7 @@ def write_dummy_wal(inc):
"restore_command = 'false'",
]
if Version(db.pgver).major >= 12:
with open(os.path.join(db.pgdata, "standby.signal"), "w") as fp:
with open(os.path.join(db.pgdata, "standby.signal"), "w"):
pass

recovery_conf_path = "postgresql.auto.conf"
Expand All @@ -339,7 +343,7 @@ def write_dummy_wal(inc):
recovery_conf_path = "recovery.conf"
open_mode = "w"

with open(os.path.join(db.pgdata, recovery_conf_path), open_mode) as fp:
with open(os.path.join(db.pgdata, recovery_conf_path), open_mode) as fp: # type: ignore
fp.write("\n".join(recovery_conf) + "\n")

# start PG and promote it
Expand Down

0 comments on commit 976c201

Please sign in to comment.