From cde176bff290f9a15bd89073682c9d54faef9609 Mon Sep 17 00:00:00 2001 From: Shahriyar Rzayev Date: Thu, 6 May 2021 15:31:20 +0400 Subject: [PATCH] Release v2.0.2 (#445) * Increasing code coverage by adding extra unit tests (#444) * First batch of tests * Added API tests, continued on file based tests * Got 50% test coverage - needs to be addressed further * update for README.md * Formatting with black and version bump * Passing the mypy check * extra linting, formatting --- README.md | 36 ++- docs/conf.py | 2 +- mysql_autoxtrabackup/autoxtrabackup.py | 257 ++++++++++-------- .../backup_backup/backup_builder.py | 2 +- .../backup_backup/backuper.py | 39 ++- mysql_autoxtrabackup/utils/helpers.py | 5 +- mysql_autoxtrabackup/utils/version.py | 4 +- tests/conftest.py | 9 + tests/test_api.py | 21 ++ tests/test_backup.py | 104 ++++++- tests/test_helpers.py | 21 ++ tests/test_mysql_cli.py | 9 + 12 files changed, 376 insertions(+), 133 deletions(-) create mode 100644 tests/test_api.py create mode 100644 tests/test_helpers.py create mode 100644 tests/test_mysql_cli.py diff --git a/README.md b/README.md index 02f2b50..4a41921 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,41 @@ backups. Then I decided to automate this process. In other words, preparing necessary commands for backup and prepare stage were automated. +We have nice CLI with necessary options: + +``` +autoxtrabackup --help +Usage: autoxtrabackup [OPTIONS] + +Options: + --dry-run Enable the dry run. + --prepare Prepare/recover backups. + --run-server Start the FastAPI app for serving API + --backup Take full and incremental backups. + --version Version information. + --defaults-file TEXT Read options from the given file [default: / + home/shako/.autoxtrabackup/autoxtrabackup.cn + f] + + --tag TEXT Pass the tag string for each backup + --show-tags Show backup tags and exit + -v, --verbose Be verbose (print to console) + -lf, --log-file TEXT Set log file [default: /home/shako/.autoxtr + abackup/autoxtrabackup.log] + + -l, --log, --log-level [DEBUG|INFO|WARNING|ERROR|CRITICAL] + Set log level [default: INFO] + --log-file-max-bytes INTEGER Set log file max size in bytes [default: + 1073741824] + + --log-file-backup-count INTEGER + Set log file backup count [default: 7] + --help Print help message and exit. +``` + + If you think, CLI is not for you. We have experimental feature where you can start API server -and take backups using API call. +and take backups using API call(ATTENTION: FastAPI involved) ``` sudo `which autoxtrabackup` --run-server @@ -46,6 +79,7 @@ Development: Current major version is >= 2.0 - so if you want to help, please do changes on this branch and then kindly send PR :) I also encourage you to upgrade from older version as the code base fully updated. +Do you have an idea, question please open an issue. Read full documentation here: ---------------------------------------------- diff --git a/docs/conf.py b/docs/conf.py index f6824c0..62d576d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -20,8 +20,8 @@ # documentation root, use os.path.abspath to make it absolute, like shown here. # import sphinx_rtd_theme -from mysql_autoxtrabackup.utils.version import VERSION +from mysql_autoxtrabackup.utils.version import VERSION sys.path.insert(0, os.path.abspath("../mysql_autoxtrabackup")) diff --git a/mysql_autoxtrabackup/autoxtrabackup.py b/mysql_autoxtrabackup/autoxtrabackup.py index be66c46..3da19e0 100644 --- a/mysql_autoxtrabackup/autoxtrabackup.py +++ b/mysql_autoxtrabackup/autoxtrabackup.py @@ -3,28 +3,33 @@ import os import re import time +from logging.handlers import RotatingFileHandler +from sys import exit +from sys import platform as _platform +from typing import Optional + import click import humanfriendly # type: ignore import pid # type: ignore +from mysql_autoxtrabackup.api import main +from mysql_autoxtrabackup.backup_backup.backuper import Backup from mysql_autoxtrabackup.backup_prepare.prepare import Prepare from mysql_autoxtrabackup.general_conf import path_config from mysql_autoxtrabackup.general_conf.generalops import GeneralClass -from mysql_autoxtrabackup.backup_backup.backuper import Backup from mysql_autoxtrabackup.process_runner.process_runner import ProcessRunner -from mysql_autoxtrabackup.api import main from mysql_autoxtrabackup.utils import version -from logging.handlers import RotatingFileHandler -from sys import exit -from sys import platform as _platform -from typing import Optional -logger = logging.getLogger('') -destinations_hash = {'linux': '/dev/log', 'linux2': '/dev/log', 'darwin': '/var/run/syslog'} +logger = logging.getLogger("") +destinations_hash = { + "linux": "/dev/log", + "linux2": "/dev/log", + "darwin": "/var/run/syslog", +} def address_matcher(plt: str) -> str: - return destinations_hash.get(plt, ('localhost', 514)) # type: ignore + return destinations_hash.get(plt, ("localhost", 514)) # type: ignore handler = logging.handlers.SysLogHandler(address=address_matcher(_platform)) @@ -43,19 +48,19 @@ def print_help(ctx: click.Context, param: None, value: bool) -> None: def print_version(ctx: click.Context, param: None, value: bool) -> None: if not value or ctx.resilient_parsing: return - click.echo( - "Developed by Shahriyar Rzayev from Azerbaijan PUG(http://azepug.az)") + click.echo("Developed by Shahriyar Rzayev from Azerbaijan PUG(http://azepug.az)") click.echo("Link : https://github.com/ShahriyarR/MySQL-AutoXtraBackup") click.echo("Email: rzayev.sehriyar@gmail.com") click.echo( - "Based on Percona XtraBackup: https://github.com/percona/percona-xtrabackup/") - click.echo(f'MySQL-AutoXtraBackup Version: {version.VERSION}') + "Based on Percona XtraBackup: https://github.com/percona/percona-xtrabackup/" + ) + click.echo(f"MySQL-AutoXtraBackup Version: {version.VERSION}") ctx.exit() def check_file_content(file: str) -> Optional[bool]: """Check if all mandatory headers and keys exist in file""" - with open(file, 'r') as config_file: + with open(file, "r") as config_file: file_content = config_file.read() config_headers = ["MySQL", "Backup", "Encrypt", "Compress", "Commands"] @@ -73,19 +78,16 @@ def check_file_content(file: str) -> Optional[bool]: "xtra_prepare", "start_mysql_command", "stop_mysql_command", - "chown_command"] + "chown_command", + ] for header in config_headers: if header not in file_content: - raise KeyError( - "Mandatory header [%s] doesn't exist in %s" % - (header, file)) + raise KeyError("Mandatory header [%s] doesn't exist in %s" % (header, file)) for key in config_keys: if key not in file_content: - raise KeyError( - "Mandatory key \'%s\' doesn't exists in %s." % - (key, file)) + raise KeyError("Mandatory key '%s' doesn't exists in %s." % (key, file)) return True @@ -99,7 +101,7 @@ def validate_file(file: str) -> Optional[bool]: raise FileNotFoundError("Specified file does not exist.") # filename extension should be .cnf - pattern = re.compile(r'.*\.cnf') + pattern = re.compile(r".*\.cnf") if pattern.match(file): # Lastly the file should have all 5 required headers @@ -111,74 +113,93 @@ def validate_file(file: str) -> Optional[bool]: @click.command() -@click.option('--dry-run', is_flag=True, help="Enable the dry run.") -@click.option('--prepare', is_flag=True, help="Prepare/recover backups.") -@click.option('--run-server', is_flag=True, help="Start the FastAPI app for serving API") -@click.option('--backup', - is_flag=True, - help="Take full and incremental backups.") -@click.option('--version', - is_flag=True, - callback=print_version, # type: ignore - expose_value=False, - is_eager=True, - help="Version information.") -@click.option('--defaults-file', - default=path_config.config_path_file, - show_default=True, - help="Read options from the given file") # type: ignore -@click.option('--tag', - help="Pass the tag string for each backup") -@click.option('--show-tags', - is_flag=True, - help="Show backup tags and exit") -@click.option('-v', '--verbose', is_flag=True, - help="Be verbose (print to console)") -@click.option('-lf', - '--log-file', - default=path_config.log_file_path, - show_default=True, - help="Set log file") -@click.option('-l', - '--log', - '--log-level', - default='INFO', - show_default=True, - type=click.Choice(['DEBUG', - 'INFO', - 'WARNING', - 'ERROR', - 'CRITICAL']), - help="Set log level") -@click.option('--log-file-max-bytes', - default=1073741824, - show_default=True, - nargs=1, - type=int, - help="Set log file max size in bytes") -@click.option('--log-file-backup-count', - default=7, - show_default=True, - nargs=1, - type=int, - help="Set log file backup count") -@click.option('--help', - is_flag=True, - callback=print_help, # type: ignore - expose_value=False, - is_eager=False, - help="Print help message and exit.") +@click.option("--dry-run", is_flag=True, help="Enable the dry run.") +@click.option("--prepare", is_flag=True, help="Prepare/recover backups.") +@click.option( + "--run-server", is_flag=True, help="Start the FastAPI app for serving API" +) +@click.option("--backup", is_flag=True, help="Take full and incremental backups.") +@click.option( + "--version", + is_flag=True, + callback=print_version, # type: ignore + expose_value=False, + is_eager=True, + help="Version information.", +) +@click.option( + "--defaults-file", + default=path_config.config_path_file, # type: ignore + show_default=True, + help="Read options from the given file", +) +@click.option("--tag", help="Pass the tag string for each backup") +@click.option("--show-tags", is_flag=True, help="Show backup tags and exit") +@click.option("-v", "--verbose", is_flag=True, help="Be verbose (print to console)") +@click.option( + "-lf", + "--log-file", + default=path_config.log_file_path, + show_default=True, + help="Set log file", +) +@click.option( + "-l", + "--log", + "--log-level", + default="INFO", + show_default=True, + type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]), + help="Set log level", +) +@click.option( + "--log-file-max-bytes", + default=1073741824, + show_default=True, + nargs=1, + type=int, + help="Set log file max size in bytes", +) +@click.option( + "--log-file-backup-count", + default=7, + show_default=True, + nargs=1, + type=int, + help="Set log file backup count", +) +@click.option( + "--help", + is_flag=True, + callback=print_help, # type: ignore + expose_value=False, + is_eager=False, + help="Print help message and exit.", +) @click.pass_context -def all_procedure(ctx, prepare, backup, run_server, tag, show_tags, - verbose, log_file, log, defaults_file, - dry_run, log_file_max_bytes, - log_file_backup_count): +def all_procedure( + ctx, + prepare, + backup, + run_server, + tag, + show_tags, + verbose, + log_file, + log, + defaults_file, + dry_run, + log_file_max_bytes, + log_file_backup_count, +): options = GeneralClass(defaults_file) logging_options = options.logging_options backup_options = options.backup_options - formatter = logging.Formatter(fmt='%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') + formatter = logging.Formatter( + fmt="%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) if verbose: ch = logging.StreamHandler() @@ -189,13 +210,22 @@ def all_procedure(ctx, prepare, backup, run_server, tag, show_tags, if log_file: try: - if logging_options.get('log_file_max_bytes') and logging_options.get('log_file_backup_count'): - file_handler = RotatingFileHandler(log_file, mode='a', - maxBytes=int(str(logging_options.get('log_file_max_bytes'))), - backupCount=int(str(logging_options.get('log_file_backup_count')))) + if logging_options.get("log_file_max_bytes") and logging_options.get( + "log_file_backup_count" + ): + file_handler = RotatingFileHandler( + log_file, + mode="a", + maxBytes=int(str(logging_options.get("log_file_max_bytes"))), + backupCount=int(str(logging_options.get("log_file_backup_count"))), + ) else: - file_handler = RotatingFileHandler(log_file, mode='a', - maxBytes=log_file_max_bytes, backupCount=log_file_backup_count) + file_handler = RotatingFileHandler( + log_file, + mode="a", + maxBytes=log_file_max_bytes, + backupCount=log_file_backup_count, + ) file_handler.setFormatter(formatter) logger.addHandler(file_handler) except PermissionError as err: @@ -204,14 +234,14 @@ def all_procedure(ctx, prepare, backup, run_server, tag, show_tags, # set log level in order: 1. user argument 2. config file 3. @click default if log is not None: logger.setLevel(log) - elif logging_options.get('log_level'): - logger.setLevel(str(logging_options.get('log_level'))) + elif logging_options.get("log_level"): + logger.setLevel(str(logging_options.get("log_level"))) else: # this is the fallback default log-level. - logger.setLevel('INFO') + logger.setLevel("INFO") validate_file(defaults_file) - pid_file = pid.PidFile(piddir=backup_options.get('pid_dir')) + pid_file = pid.PidFile(piddir=backup_options.get("pid_dir")) try: with pid_file: # User PidFile for locking to single instance @@ -219,19 +249,21 @@ def all_procedure(ctx, prepare, backup, run_server, tag, show_tags, if dry_run_: dry_run_ = 1 logger.warning("Dry run enabled!") - if (prepare is False and - backup is False and - verbose is False and - dry_run is False and - show_tags is False and - run_server is False): + if ( + prepare is False + and backup is False + and verbose is False + and dry_run is False + and show_tags is False + and run_server is False + ): print_help(ctx, None, value=True) elif run_server: main.run_server() elif show_tags and defaults_file: backup_ = Backup(config=defaults_file) - backup_.show_tags(backup_dir=str(backup_options.get('backup_dir'))) + backup_.show_tags(backup_dir=str(backup_options.get("backup_dir"))) elif prepare: prepare_ = Prepare(config=defaults_file, dry_run=dry_run_, tag=tag) prepare_.prepare_backup_and_copy_back() @@ -240,15 +272,26 @@ def all_procedure(ctx, prepare, backup, run_server, tag, show_tags, backup_.all_backup() except (pid.PidFileAlreadyLockedError, pid.PidFileAlreadyRunningError) as error: - if float(str(backup_options.get('pid_runtime_warning'))) and time.time() - os.stat( - pid_file.filename).st_ctime > float(str(backup_options.get('pid_runtime_warning'))): + if float( + str(backup_options.get("pid_runtime_warning")) + ) and time.time() - os.stat(pid_file.filename).st_ctime > float( + str(backup_options.get("pid_runtime_warning")) + ): pid.fh.seek(0) pid_str = pid.fh.read(16).split("\n", 1)[0].strip() - logger.warning("Pid file already exists or Pid already running! : ", str(error)) + logger.warning( + "Pid file already exists or Pid already running! : ", str(error) + ) logger.critical( - "Backup (pid: " + pid_str + ") has been running for logger than: " + str( + "Backup (pid: " + + pid_str + + ") has been running for logger than: " + + str( humanfriendly.format_timespan( - backup_options.get('pid_runtime_warning')))) + backup_options.get("pid_runtime_warning") + ) + ) + ) except pid.PidFileUnreadableError as error: logger.warning("Pid file can not be read: " + str(error)) diff --git a/mysql_autoxtrabackup/backup_backup/backup_builder.py b/mysql_autoxtrabackup/backup_backup/backup_builder.py index 53aec49..c40422d 100644 --- a/mysql_autoxtrabackup/backup_backup/backup_builder.py +++ b/mysql_autoxtrabackup/backup_backup/backup_builder.py @@ -214,7 +214,7 @@ def inc_backup_command_builder( self, recent_full_bck: Optional[str], inc_backup_dir: Optional[str], - recent_inc_bck: Union[str, None] = None, + recent_inc_bck: Optional[str] = None, ) -> str: xtrabackup_inc_cmd_base = ( "{} --defaults-file={} --user={} --password={}".format( diff --git a/mysql_autoxtrabackup/backup_backup/backuper.py b/mysql_autoxtrabackup/backup_backup/backuper.py index 0eb7224..67f0e74 100755 --- a/mysql_autoxtrabackup/backup_backup/backuper.py +++ b/mysql_autoxtrabackup/backup_backup/backuper.py @@ -94,8 +94,9 @@ def add_tag( return True @staticmethod - def show_tags(backup_dir: str) -> None: - if os.path.isfile("{}/backup_tags.txt".format(backup_dir)): + def show_tags(backup_dir: str, tag_file: Optional[str] = None) -> Optional[bool]: + tag_file = tag_file or "{}/backup_tags.txt".format(backup_dir) + if os.path.isfile(tag_file): with open("{}/backup_tags.txt".format(backup_dir), "r") as backup_tags: from_file = backup_tags.read() column_names = "{0}\t{1}\t{2}\t{3}\t{4}\tTAG\n".format( @@ -108,6 +109,7 @@ def show_tags(backup_dir: str) -> None: extra_str = "{}\n".format("-" * (len(column_names) + 21)) print(column_names + extra_str + from_file) logger.info(column_names + extra_str + from_file) + return True else: logger.warning( "Could not find backup_tags.txt inside given backup directory. Can't print tags." @@ -115,29 +117,35 @@ def show_tags(backup_dir: str) -> None: print( "WARNING: Could not find backup_tags.txt inside given backup directory. Can't print tags." ) + return None - def last_full_backup_date(self) -> bool: + def last_full_backup_date( + self, path: Optional[str] = None, full_backup_interval: Optional[float] = None + ) -> bool: """ Check if last full backup date retired or not. :return: True if last full backup date older than given interval, False if it is newer. """ # Finding last full backup date from dir/folder name - max_dir = helpers.get_latest_dir_name( - str(self.builder_obj.backup_options.get("full_dir")) + full_dir = path or str(self.builder_obj.backup_options.get("full_dir")) + backup_interval = full_backup_interval or str( + self.builder_obj.backup_options.get("full_backup_interval") ) + max_dir = helpers.get_latest_dir_name(full_dir) + dir_date = datetime.strptime(str(max_dir), "%Y-%m-%d_%H-%M-%S") now = datetime.now() - return float((now - dir_date).total_seconds()) >= float( - str(self.builder_obj.backup_options.get("full_backup_interval")) - ) + return float((now - dir_date).total_seconds()) >= float(backup_interval) def clean_full_backup_dir( - self, remove_all: Union[bool, None] = None - ) -> Union[None, bool]: + self, + full_dir: Optional[str] = None, + remove_all: Optional[bool] = None, + ) -> Optional[bool]: # Deleting old full backup after taking new full backup. # Keeping the latest in order not to lose everything. logger.info("starting clean_full_backup_dir") - full_dir = str(self.builder_obj.backup_options.get("full_dir")) + full_dir = full_dir or str(self.builder_obj.backup_options.get("full_dir")) if not os.path.isdir(full_dir): return True if remove_all: @@ -155,9 +163,9 @@ def clean_full_backup_dir( logger.info("KEEPING {}".format(rm_dir)) return True - def clean_inc_backup_dir(self) -> Union[None, bool]: + def clean_inc_backup_dir(self, inc_dir: Optional[str] = None) -> Optional[bool]: # Deleting incremental backups after taking new fresh full backup. - inc_dir = str(self.builder_obj.backup_options.get("inc_dir")) + inc_dir = inc_dir or str(self.builder_obj.backup_options.get("inc_dir")) if not os.path.isdir(inc_dir): return True for i in os.listdir(inc_dir): @@ -214,6 +222,11 @@ def inc_backup(self) -> bool: recent_full_bck = helpers.get_latest_dir_name( str(self.builder_obj.backup_options.get("full_dir")) ) + if not recent_full_bck: + raise RuntimeError( + "Failed to get Full backup path. Are you sure you have one?" + ) + # Get the recent incremental backup path recent_inc_bck = helpers.get_latest_dir_name( str(self.builder_obj.backup_options.get("inc_dir")) diff --git a/mysql_autoxtrabackup/utils/helpers.py b/mysql_autoxtrabackup/utils/helpers.py index c5bd322..b615963 100644 --- a/mysql_autoxtrabackup/utils/helpers.py +++ b/mysql_autoxtrabackup/utils/helpers.py @@ -51,13 +51,16 @@ def get_directory_size(path: str) -> int: return total_size -def create_backup_directory(directory: str) -> str: +def create_backup_directory(directory: str, forced_dir: Optional[str] = None) -> str: """ Function for creating timestamped directory on given path :param directory: Directory path + :param forced_dir: Full Directory path forced to be created :return: Created new directory path """ new_dir = os.path.join(directory, datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) + if forced_dir: + new_dir = os.path.join(directory, forced_dir) try: # Creating directory os.makedirs(new_dir) diff --git a/mysql_autoxtrabackup/utils/version.py b/mysql_autoxtrabackup/utils/version.py index 5bfc8fb..2fd781c 100644 --- a/mysql_autoxtrabackup/utils/version.py +++ b/mysql_autoxtrabackup/utils/version.py @@ -1,3 +1,3 @@ -__all__ = 'VERSION' +__all__ = "VERSION" -VERSION = '2.0.1' +VERSION = "2.0.2" diff --git a/tests/conftest.py b/tests/conftest.py index 5d27433..54b0c88 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,18 @@ import pytest +from fastapi.testclient import TestClient + +from mysql_autoxtrabackup.api.main import app from mysql_autoxtrabackup.backup_backup.backuper import Backup bck_obj = Backup() +client = TestClient(app) @pytest.fixture() def return_bck_obj(): return bck_obj + + +@pytest.fixture() +def fastapi_client(): + return client diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..e94d9b1 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,21 @@ +class TestAPI: + def test_take_backup(self, fastapi_client): + response = fastapi_client.post("/backup") + assert response.status_code == 201 + assert response.json() == {"result": "Successfully finished the backup process"} + + def test_prepare_backup(self, fastapi_client): + response = fastapi_client.post("/prepare") + assert response.status_code == 200 + assert response.json() == {"result": "Successfully prepared all the backups"} + + def test_list_backups(self, fastapi_client): + response = fastapi_client.get("/backups") + assert response.status_code == 200 + + def test_delete_backups(self, fastapi_client): + response = fastapi_client.delete("/delete") + assert response.status_code == 200 + assert response.json() == { + "result": "There is no backups or backups removed successfully" + } diff --git a/tests/test_backup.py b/tests/test_backup.py index d5365f3..6911f23 100644 --- a/tests/test_backup.py +++ b/tests/test_backup.py @@ -6,11 +6,6 @@ @pytest.mark.usefixtures("return_bck_obj") class TestBackup: - def test_create_mysql_client_command(self, return_bck_obj): - result = '/usr/bin/mysql --defaults-file= -uroot --password=12345 --socket=/var/lib/mysql/mysql.sock -e "select 1"' - sql = "select 1" - assert return_bck_obj.create_mysql_client_command(sql) == result - def test_full_backup_without_tag(self, return_bck_obj): return_bck_obj.clean_full_backup_dir() return_bck_obj.full_backup() @@ -23,6 +18,101 @@ def test_full_backup_with_tag(self, return_bck_obj): # Making it None back for global object return_bck_obj.tag = None # Check if the backup tag file is created and contains given string - assert os.path.isfile("{}/backup_tags.txt".format(return_bck_obj.backupdir)) - with open("{}/backup_tags.txt".format(return_bck_obj.backupdir), "r") as file: + assert os.path.isfile( + "{}/backup_tags.txt".format( + return_bck_obj.builder_obj.backup_options.get("backup_dir") + ) + ) + with open( + "{}/backup_tags.txt".format( + return_bck_obj.builder_obj.backup_options.get("backup_dir") + ), + "r", + ) as file: assert "My first full backup" in file.read() + + def test_full_backup_dry_run(self, return_bck_obj): + return_bck_obj.dry = True + assert return_bck_obj.full_backup() is True + + def test_show_tags_with_wrong_file_name(self, return_bck_obj): + assert ( + return_bck_obj.show_tags( + return_bck_obj.builder_obj.backup_options.get("backup_dir"), "dummy.txt" + ) + is None + ) + + def test_show_tags_with_correct_file_name(self, return_bck_obj): + assert ( + return_bck_obj.show_tags( + return_bck_obj.builder_obj.backup_options.get("backup_dir") + ) + is True + ) + + def test_last_full_backup_date(self, return_bck_obj): + os.makedirs("tests/DELETE_ME", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-48-31", mode=777, exist_ok=True) + assert ( + return_bck_obj.last_full_backup_date( + path=f"{os.path.dirname(__file__)}/DELETE_ME", full_backup_interval=60 + ) + is True + ) + assert ( + return_bck_obj.last_full_backup_date( + path=f"{os.path.dirname(__file__)}/DELETE_ME", + full_backup_interval=6000000, + ) + is False + ) + + def test_clean_full_backup_dir_dummy_path(self, return_bck_obj): + assert ( + return_bck_obj.clean_full_backup_dir(full_dir="NON_EXISTING_PATH_NAME") + is True + ) + + def test_clean_full_backup_dir_real_path(self, return_bck_obj): + os.makedirs("tests/DELETE_ME", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-48-31", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-47-31", mode=777, exist_ok=True) + assert ( + return_bck_obj.clean_full_backup_dir( + full_dir=f"{os.path.dirname(__file__)}/DELETE_ME" + ) + is True + ) + for file_ in os.listdir(f"{os.path.dirname(__file__)}/DELETE_ME"): + assert file_ == "2021-05-06_11-48-31" + + def test_clean_full_backup_dir_with_remove_all(self, return_bck_obj): + os.makedirs("tests/DELETE_ME", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-48-31", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-47-31", mode=777, exist_ok=True) + assert ( + return_bck_obj.clean_full_backup_dir( + full_dir=f"{os.path.dirname(__file__)}/DELETE_ME", remove_all=True + ) + is True + ) + assert len(os.listdir(f"{os.path.dirname(__file__)}/DELETE_ME")) == 0 + + def test_clean_inc_backup_dir_with_dummy_path(self, return_bck_obj): + assert ( + return_bck_obj.clean_inc_backup_dir(inc_dir="NON_EXISTING_PATH_NAME") + is True + ) + + def test_clean_inc_backup_dir_real_path(self, return_bck_obj): + os.makedirs("tests/DELETE_ME", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-48-31", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-47-31", mode=777, exist_ok=True) + assert ( + return_bck_obj.clean_inc_backup_dir( + inc_dir=f"{os.path.dirname(__file__)}/DELETE_ME" + ) + is True + ) + assert len(os.listdir(f"{os.path.dirname(__file__)}/DELETE_ME")) == 0 diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..e42cbee --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,21 @@ +import os +import shutil + +from mysql_autoxtrabackup.utils import helpers + + +class TestHelpers: + def test_get_latest_dir_name(self): + os.makedirs("tests/DELETE_ME", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-48-31", mode=777, exist_ok=True) + os.makedirs("tests/DELETE_ME/2021-05-06_11-47-31", mode=777, exist_ok=True) + + assert ( + helpers.get_latest_dir_name(path=f"{os.path.dirname(__file__)}/DELETE_ME") + == "2021-05-06_11-48-31" + ) + + def test_create_backup_directory(self): + path_ = f"{os.path.dirname(__file__)}/DELETE_ME" + assert helpers.create_backup_directory(path_, "TEST_DIR") == f"{path_}/TEST_DIR" + shutil.rmtree(f"{path_}/TEST_DIR") diff --git a/tests/test_mysql_cli.py b/tests/test_mysql_cli.py new file mode 100644 index 0000000..acfbe88 --- /dev/null +++ b/tests/test_mysql_cli.py @@ -0,0 +1,9 @@ +class TestMySQLCLi: + def test_create_mysql_client_command(self, return_bck_obj): + result = '/usr/bin/mysql --defaults-file= -uroot --password=12345 --socket=/var/run/mysqld/mysqld.sock -e "select 1"' + sql = "select 1" + assert return_bck_obj.mysql_cli.create_mysql_client_command(sql) == result + + def test_mysql_run_command(self, return_bck_obj): + sql = "select 1" + assert return_bck_obj.mysql_cli.mysql_run_command(sql) is True