diff --git a/pghoard/compressor.py b/pghoard/compressor.py index 8635e435..89a7c76d 100644 --- a/pghoard/compressor.py +++ b/pghoard/compressor.py @@ -43,7 +43,7 @@ class BaseCompressorEvent: file_path: Path backup_site_name: str source_data: Union[BinaryIO, Path] - callback_queue: CallbackQueue + callback_queue: Optional[CallbackQueue] metadata: Dict[str, str] diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 6e632873..aee650a8 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -934,8 +934,50 @@ def _get_all_threads(self): all_threads.extend(self.transfer_agents) return all_threads + def _wait_for_queue_to_be_emptied( + self, + queue: Queue, + queue_name: str, + timeout: Optional[int] = None, + ) -> None: + start = time.monotonic() + while True: + if queue.empty(): + self.log.info("%r queue has been emptied.", queue_name) + break + + if timeout is not None and time.monotonic() - start > timeout: + self.log.warning("Exceeded waiting time for %r queue to be emptied", queue_name) + break + + time.sleep(0.1) + def handle_exit_signal(self, _signal=None, _frame=None): # pylint: disable=unused-argument self.log.warning("Quitting, signal: %r", _signal) + if _signal == signal.SIGTERM: + self.graceful_shutdown() + else: + self.quit() + + def graceful_shutdown(self) -> None: + """ + Makes sure all missing files are compressed, uploaded and deleted before all threads are inactive. + + Steps to follow: + - Shutdown receivexlogs and walreceivers threads + - Wait for compression, transfer and deletion queues to be empty + - Quit (stop remaining threads and write state file) + """ + self.log.info("Gracefully shutting down...") + self.running = False + for thread in [*self.receivexlogs.values(), *self.walreceivers.values()]: + thread.running = False + + # wait for all queues to be emptied + self._wait_for_queue_to_be_emptied(self.compression_queue, "compression") + self._wait_for_queue_to_be_emptied(self.transfer_queue, "transfer") + self._wait_for_queue_to_be_emptied(self.wal_file_deletion_queue, "wal_file_deletion") + self.quit() def quit(self): diff --git a/test/test_pghoard.py b/test/test_pghoard.py index f3170b73..8dfc384a 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -11,14 +11,19 @@ import time from pathlib import Path from typing import Any, Dict -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch import pytest import pghoard.pghoard as pghoard_module -from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file) +from pghoard.common import ( + BaseBackupFormat, FileType, FileTypePrefixes, create_alert_file, delete_alert_file, write_json_file +) +from pghoard.compressor import CompressionEvent from pghoard.pghoard import PGHoard from pghoard.pgutil import create_connection_string +from pghoard.receivexlog import PGReceiveXLog +from pghoard.transfer import TransferAgent from .base import PGHoardTestCase from .util import dict_to_tar_file, switch_wal, wait_for_xlog @@ -755,6 +760,61 @@ def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, upload_event = self.pghoard.transfer_queue.get(timeout=1.0) assert upload_event.file_type == file_type + @patch("pghoard.compressor.wal.verify_wal", Mock()) + @patch.object(PGReceiveXLog, "run", Mock()) + @patch.object(TransferAgent, "get_object_storage") + def test_graceful_shutdown_with_partial_wal_file( + self, + mocked_get_object_storage: MagicMock, + ) -> None: + compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) + uncompressed_wal_path = compressed_wal_path + "_incoming" + + self.config["backup_sites"][self.test_site]["active_backup_mode"] = "pg_receivexlog" + + self.pghoard.receivexlog_listener( + self.test_site, self.config["backup_sites"][self.test_site]["nodes"][0], uncompressed_wal_path + ) + + assert len(self.pghoard.receivexlogs) == 1 + + file_name = "000000010000000000000008" + uncompressed_file_path = os.path.join(uncompressed_wal_path, file_name) + with open(uncompressed_file_path, "wb") as fp: + fp.write(b"foo") + + self.pghoard.compression_queue.put( + CompressionEvent( + file_type=FileType.Wal, + file_path=FileTypePrefixes[FileType.Wal] / file_name, + delete_file_after_compression=True, + backup_site_name=self.test_site, + source_data=Path(uncompressed_file_path), + callback_queue=None, + metadata={} + ) + ) + + # run compressors, transfer_agents and wal_file_deleter + for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]: + thread.start() + + self.pghoard.graceful_shutdown() + + assert self.pghoard.compression_queue.qsize() == 0 + assert self.pghoard.transfer_queue.qsize() == 0 + assert self.pghoard.wal_file_deletion_queue.qsize() == 0 + + # called once for uploading renamed partial file + assert mocked_get_object_storage.call_count == 1 + + # uncompressed file should still exist since WALDeletionThread always keeps last file + assert os.path.exists(uncompressed_file_path) + + # verify compressors, transfer_agents and wal_file_deleter are not running + for thread in [*self.pghoard.compressors, *self.pghoard.transfer_agents, self.pghoard.wal_file_deleter]: + assert thread.is_alive() is False + class TestPGHoardWithPG: def test_auth_alert_files(self, db, pghoard):