From dafc7831097455ad064dc01510b9a203fc29c05e Mon Sep 17 00:00:00 2001 From: munakoiso Date: Tue, 12 Nov 2024 19:56:09 +0500 Subject: [PATCH] better logging if _handle_slots --- src/main.py | 12 +++++++++--- src/pg.py | 36 ++++++++++++++++++++---------------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/main.py b/src/main.py index 8e1d5e5..2cd09ac 100644 --- a/src/main.py +++ b/src/main.py @@ -1174,15 +1174,21 @@ def _handle_slots(self): # create slots slot_names = [helpers.app_name_from_fqdn(fqdn) for fqdn in slot_lock_holders] + actual_replication_slots = self.db.get_replication_slots() + if actual_replication_slots is None: + logging.warning('Failed to get actual replication slots') + # However, we can continue here and try to create slots. None of slots will be dropped, but some might be created + else: + logging.debug('Actual replication slots: %s', actual_replication_slots) - if not self.db.replication_slots('create', slot_names): + if not self.db.create_replication_slots(slot_names, verbose=False): logging.warning('Could not create replication slots. %s', slot_names) # drop slots if my_hostname in non_holders_hosts: non_holders_hosts.remove(my_hostname) slot_names_to_drop = [helpers.app_name_from_fqdn(fqdn) for fqdn in non_holders_hosts] - if not self.db.replication_slots('drop', slot_names_to_drop): + if not self.db.drop_replication_slots(slot_names_to_drop, verbose=False): logging.warning('Could not drop replication slots. %s', slot_names_to_drop) def _get_db_state(self): @@ -1348,7 +1354,7 @@ def _promote_handle_slots(self): return False # Create replication slots, regardless of whether replicas hold DCS locks for replication slots. hosts = [helpers.app_name_from_fqdn(fqdn) for fqdn in hosts] - if not self.db.replication_slots('create', hosts): + if not self.db.create_replication_slots(hosts): logging.error('Could not create replication slots. Releasing the lock in ZK.') return False diff --git a/src/pg.py b/src/pg.py index 29f066b..d0f94d6 100644 --- a/src/pg.py +++ b/src/pg.py @@ -143,15 +143,17 @@ def _offline_detect_pgdata(self): logging.error(line.rstrip()) @helpers.return_none_on_error - def _get_replication_slots(self): + def get_replication_slots(self): res = self._exec_query('SELECT slot_name FROM pg_replication_slots;').fetchall() return [i[0] for i in res] def _create_replication_slot(self, slot_name): + logging.info('Creating slot %s.', slot_name) query = f"SELECT pg_create_physical_replication_slot('{slot_name}', true)" return self._exec_without_result(query) def _drop_replication_slot(self, slot_name): + logging.info('Dropping slot %s.', slot_name) query = f"SELECT pg_drop_replication_slot('{slot_name}')" return self._exec_without_result(query) @@ -736,24 +738,26 @@ def stop_postgresql(self, timeout=60): logging.warning(line.rstrip()) return self._cmd_manager.stop_postgresql(timeout, self.pgdata) - def replication_slots(self, action, slots): - """ - Perform replication slots action (create/drop) - """ - current = self._get_replication_slots() + def create_replication_slots(self, slots, verbose=True): + current = self.get_replication_slots() for slot in slots: - if action == 'create': - if current and slot in current: + if current and slot in current: + if verbose: logging.debug('Slot %s already exists.', slot) - continue - if not self._create_replication_slot(slot): - return False - else: - if current is not None and slot not in current: + continue + if not self._create_replication_slot(slot): + return False + return True + + def drop_replication_slots(self, slots, verbose=True): + current = self.get_replication_slots() + for slot in slots: + if current is not None and slot not in current: + if verbose: logging.debug('Slot %s does not exist.', slot) - continue - if not self._drop_replication_slot(slot): - return False + continue + if not self._drop_replication_slot(slot): + return False return True def is_replaying_wal(self, check_time):