From ee14d55d5820a379c25f41405a8b6f083ce0ea1e Mon Sep 17 00:00:00 2001 From: Camilo Cota <1499184+ccronca@users.noreply.github.com> Date: Mon, 21 Oct 2024 09:16:52 +0200 Subject: [PATCH] Dump configuration in the results directory (#227) Dumps redacted versions of the main and default configuration files to the destination directory --- rapidast.py | 81 ++++++++++++++++++++++++++---- tests/test_dump_redacted_config.py | 68 +++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 9 deletions(-) create mode 100644 tests/test_dump_redacted_config.py diff --git a/rapidast.py b/rapidast.py index a03d0372..6ec964c1 100755 --- a/rapidast.py +++ b/rapidast.py @@ -20,6 +20,9 @@ pp = pprint.PrettyPrinter(indent=4) +DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), "rapidast-defaults.yaml") + + def load_environment(config): """Load the environment variables based on the config set in config.environ""" source = config.get("config.environ.envFile") @@ -125,6 +128,62 @@ def run_scanner(name, config, args, scan_exporter): return 0 +def dump_redacted_config(config_file_location: str, destination_dir: str) -> bool: + """ + Redacts sensitive parameters from a configuration file and writes the redacted + version to a destination directory + + Args: + config_file_location: The file path to the source configuration file + destination_dir: The directory where the redacted configuration file should be saved + + """ + logging.info("Starting the redaction and dumping process for the configuration file: {config_file_location}") + + try: + if not os.path.exists(destination_dir): + os.makedirs(destination_dir) + logging.info(f"Created destination directory: {destination_dir}") + + config = yaml.safe_load(load_config_file(config_file_location)) + + logging.info(f"Redacting sensitive information from configuration {config_file_location}") + for key in config.keys(): + if config[key].get("authentication") and config[key]["authentication"].get("parameters"): + for param in config[key]["authentication"]["parameters"]: + config[key]["authentication"]["parameters"][param] = "*****" + + dest = os.path.join(destination_dir, os.path.basename(config_file_location)) + logging.info(f"Saving redacted configuration to {dest}") + with open(dest, "w", encoding="utf-8") as file: + yaml.dump(config, file) + + logging.info("Redacted configuration saved successfully") + return True + + except (FileNotFoundError, yaml.YAMLError, IOError) as e: + logging.error(f"Error occurred while dumping redacted config: {e}") + return False + + +def dump_rapidast_redacted_configs(main_config_file_location: str, destination_dir: str): + """ + Dumps redacted versions of the main and default configuration files to the destination directory. + + Args: + main_config_file_location: The file path to the main configuration file. + destination_dir: The directory where the redacted configuration files should be saved. + """ + if not dump_redacted_config(main_config_file_location, destination_dir): + logging.error("Failed to dump configuration. Exiting.") + sys.exit(2) + + if os.path.exists(DEFAULT_CONFIG_FILE): + if not dump_redacted_config(DEFAULT_CONFIG_FILE, destination_dir): + logging.error("Failed to dump configuration. Exiting.") + sys.exit(2) + + def run(): parser = argparse.ArgumentParser( description="Runs various DAST scanners against a defined target, as configured by a configuration file." @@ -153,27 +212,31 @@ def run(): args.loglevel = args.loglevel.upper() add_logging_level("VERBOSE", logging.DEBUG + 5) logging.basicConfig(format="%(levelname)s:%(message)s", level=args.loglevel) - logging.debug(f"log level set to debug. Config file: '{parser.parse_args().config_file}'") + config_file = parser.parse_args().config_file + + logging.debug(f"log level set to debug. Config file: '{config_file}'") # Load config file try: - config = configmodel.RapidastConfigModel(yaml.safe_load(load_config_file(parser.parse_args().config_file))) + config = configmodel.RapidastConfigModel(yaml.safe_load(load_config_file(config_file))) except yaml.YAMLError as exc: - raise RuntimeError(f"YAML error in config {parser.parse_args().config_file}':\n {str(exc)}") from exc + raise RuntimeError(f"YAML error in config {config_file}':\n {str(exc)}") from exc + + full_result_dir_path = get_full_result_dir_path(config) + dump_rapidast_redacted_configs(config_file, full_result_dir_path) # Optionally adds default if file exists (will not overwrite existing entries) - default_conf = os.path.join(os.path.dirname(__file__), "rapidast-defaults.yaml") - if os.path.exists(default_conf): - logging.info(f"Loading defaults from: {default_conf}") + if os.path.exists(DEFAULT_CONFIG_FILE): + logging.info(f"Loading defaults from: {DEFAULT_CONFIG_FILE}") try: - config.merge(yaml.safe_load(load_config_file(default_conf)), preserve=True) + config.merge(yaml.safe_load(load_config_file(DEFAULT_CONFIG_FILE)), preserve=True) except yaml.YAMLError as exc: - raise RuntimeError(f"YAML error in config {default_conf}':\n {str(exc)}") from exc + raise RuntimeError(f"YAML error in config {DEFAULT_CONFIG_FILE}':\n {str(exc)}") from exc # Update to latest config schema if need be config = configmodel.converter.update_to_latest_config(config) - config.set("config.results_dir", get_full_result_dir_path(config)) + config.set("config.results_dir", full_result_dir_path) logging.debug(f"The entire loaded configuration is as follow:\n=====\n{pp.pformat(config)}\n=====") diff --git a/tests/test_dump_redacted_config.py b/tests/test_dump_redacted_config.py new file mode 100644 index 00000000..0dc9c45d --- /dev/null +++ b/tests/test_dump_redacted_config.py @@ -0,0 +1,68 @@ +from unittest.mock import mock_open +from unittest.mock import patch + +import pytest +import yaml + +from rapidast import DEFAULT_CONFIG_FILE +from rapidast import dump_rapidast_redacted_configs +from rapidast import dump_redacted_config + + +@pytest.fixture +def mock_yaml_data() -> dict: + return { + "service1": {"authentication": {"parameters": {"username": "admin", "password": "secret"}}}, + "service2": {"authentication": {"parameters": {"api_key": "123456"}}}, + } + + +@patch("yaml.safe_load") +@patch("yaml.dump") +@patch("builtins.open", new_callable=mock_open) +@patch("rapidast.load_config_file") +def test_dump_redacted_config_success( + mock_load_config_file, mock_open_func, mock_yaml_dump, mock_yaml_load, mock_yaml_data: dict +) -> None: + expected_redacted_data = { + "service1": {"authentication": {"parameters": {"username": "*****", "password": "*****"}}}, + "service2": {"authentication": {"parameters": {"api_key": "*****"}}}, + } + mock_yaml_load.return_value = mock_yaml_data + success = dump_redacted_config("config.yaml", "destination_dir") + + assert success + + mock_open_func.assert_called_once_with("destination_dir/config.yaml", "w", encoding="utf-8") + mock_yaml_dump.assert_called_once_with(expected_redacted_data, mock_open_func()) + + +@patch("rapidast.load_config_file") +def test_dump_redacted_exceptions(mock_load_config_file) -> None: + for e in (FileNotFoundError, yaml.YAMLError, IOError): + mock_load_config_file.side_effect = e + success = dump_redacted_config("invalid_config.yaml", "destination_dir") + assert not success + + +@patch("os.makedirs") +@patch("os.path.exists") +@patch("rapidast.load_config_file") +def test_dump_redacted_config_creates_destination_dir(mock_load_config_file, mock_exists, mock_os_makedirs) -> None: + # Raising a FileNotFoundError to simulate the absence of the configuration file and stop the process + mock_load_config_file.side_effect = FileNotFoundError + mock_exists.return_value = False + _ = dump_redacted_config("config.yaml", "destination_dir") + + mock_os_makedirs.assert_called_with("destination_dir") + + +@patch("os.path.exists") +@patch("rapidast.dump_redacted_config") +def test_dump_rapidast_redacted_configs(mock_dump_redacted_config, mock_exists): + mock_exists.return_value = True + dump_rapidast_redacted_configs("config.yaml", "destination_dir") + + mock_exists.assert_called_once_with(DEFAULT_CONFIG_FILE) + mock_dump_redacted_config.assert_any_call(DEFAULT_CONFIG_FILE, "destination_dir") + mock_dump_redacted_config.assert_any_call("config.yaml", "destination_dir")