From b4d1e69071d90c06b9d3f16e4268027a66bfaa86 Mon Sep 17 00:00:00 2001 From: Rajesh Duraisamy Date: Tue, 21 May 2024 11:20:07 -0700 Subject: [PATCH] Implement canary file generation functionality from contract test inputs files --- src/rpdk/core/generate.py | 4 +- src/rpdk/core/generate_canary.py | 254 +++++++++++++++++++++++++ src/rpdk/core/project.py | 12 ++ tests/test_generate_canary.py | 305 +++++++++++++++++++++++++++++++ 4 files changed, 574 insertions(+), 1 deletion(-) create mode 100644 src/rpdk/core/generate_canary.py create mode 100644 tests/test_generate_canary.py diff --git a/src/rpdk/core/generate.py b/src/rpdk/core/generate.py index 58383055..3f7ba410 100644 --- a/src/rpdk/core/generate.py +++ b/src/rpdk/core/generate.py @@ -4,6 +4,7 @@ """ import logging +from .generate_canary import CanaryFileGenerator from .project import Project LOG = logging.getLogger(__name__) @@ -20,7 +21,8 @@ def generate(args): args.profile, ) project.generate_docs() - + canary_file_generator = CanaryFileGenerator() + canary_file_generator.generate_canary_files() LOG.warning("Generated files for %s", project.type_name) diff --git a/src/rpdk/core/generate_canary.py b/src/rpdk/core/generate_canary.py new file mode 100644 index 00000000..f3655992 --- /dev/null +++ b/src/rpdk/core/generate_canary.py @@ -0,0 +1,254 @@ +import json +import logging +import re +import shutil +from pathlib import Path +from typing import Any, Dict +from uuid import uuid4 + +import yaml + +TARGET_SCHEMAS_FOLDER = "target-schemas" +TARGET_CANARY_ROOT_FOLDER = "canary-bundle" +TARGET_CANARY_FOLDER = "canary-bundle/canary" +CONTRACT_TEST_FOLDER = "contract-tests-artifacts" +RPDK_CONFIG_FILE = ".rpdk-config" +CANARY_COUNT = "canary_count" +CANARY_FILE_PREFIX = "canary" +CONTRACT_TEST_INPUT_PREFIX = "inputs_*" +CONTRACT_TEST_DEPENDENCY_FILE_NAME = "dependencies.yml" +CANARY_DEPENDENCY_FILE_NAME = "bootstrap.yaml" +FILE_GENERATION_ENABLED = "file_generation_enabled" +CANARY_SETTINGS = "canarySettings" +TYPE_NAME = "typeName" +CONTRACT_TEST_FILE_NAMES = "contract_test_file_names" +INPUT1_FILE_NAME = "inputs_1.json" +FN_SUB = "Fn::Sub" +FN_IMPORT_VALUE = "Fn::ImportValue" +UUID = "uuid" +DYNAMIC_VALUES_MAP = { + "region": "${AWS::Region}", + "partition": "${AWS::Partition}", + "account": "${AWS::AccountId}", +} + + +class CanaryFileGenerator: + def __init__(self, root=None): + self.root = Path(root) if root else Path.cwd() + self.load_rpdk_config() + + def load_rpdk_config(self) -> None: + """ + Load the RPDK configuration file. + + This method loads the RPDK configuration file from the current working directory. + If the file is not found, it logs a warning and returns an empty dictionary. + """ + config_file_path = self.rpdk_config + + if not config_file_path.is_file(): + logging.warning("RPDK configuration file not found") + self.canary_settings = {} + return + + with open(config_file_path, "r", encoding="utf-8") as f: + rpdk_config = json.load(f) + self.canary_settings = rpdk_config.get(CANARY_SETTINGS, {}) + self.type_name = rpdk_config.get(TYPE_NAME, "") + self.contract_test_file_names = self.canary_settings.get( + CONTRACT_TEST_FILE_NAMES, [INPUT1_FILE_NAME] + ) + + def generate_canary_files(self) -> None: + """ + Generate canary files based on the contract test input files. + + This method checks if file generation is enabled and if the target contract test folder exists. + If both conditions are met, it creates the canary folder, copies the contract test dependencies, + and generates canary files for each contract test input file up to the specified count. + """ + if ( + not self.is_file_generation_enabled() + or not self.contract_test_folder_exists() + ): + return + + self._setup_canary_environment() + self._generate_canary_files() + + def is_file_generation_enabled(self) -> bool: + return self.canary_settings.get(FILE_GENERATION_ENABLED, False) + + def contract_test_folder_exists(self) -> bool: + return Path(self.target_contract_test_folder_path).exists() + + def _setup_canary_environment(self) -> None: + canary_root = Path(self.target_canary_root_path) + canary_folder = Path(self.target_canary_folder_path) + self.clean_and_create_canary_folder(canary_root, canary_folder) + self.create_canary_bootstrap( + Path(self.target_contract_test_folder_path), canary_root + ) + + def _generate_canary_files(self) -> None: + resource_name = self.type_name + canary_file_name_prefix = CANARY_FILE_PREFIX + canary_folder = Path(self.target_canary_folder_path) + contract_test_files = self._get_sorted_contract_test_files() + for count, ct_file in enumerate(contract_test_files, start=1): + self.create_canary_file( + resource_name, ct_file, canary_folder, canary_file_name_prefix, count + ) + + def _get_sorted_contract_test_files(self) -> list: + contract_test_folder = Path(self.target_contract_test_folder_path) + contract_test_files = [ + file + for file in contract_test_folder.glob(CONTRACT_TEST_INPUT_PREFIX) + if file.is_file() and file.name in self.contract_test_file_names + ] + return sorted(contract_test_files) + + def clean_and_create_canary_folder( + self, canary_root: Path, canary_folder: Path + ) -> None: + """ + Clean and create the canary folder. + + This method removes the existing canary root folder and creates a new canary folder. + + Args: + canary_root (Path): The path to the canary root folder. + canary_folder (Path): The path to the canary folder. + """ + shutil.rmtree(canary_root, ignore_errors=True) + canary_folder.mkdir(parents=True, exist_ok=True) + + def create_canary_bootstrap(self, file_location: Path, canary_root: Path) -> None: + """ + Copy the contract test dependencies to the canary root folder. + + This method copies the contract test dependency file to the canary root folder + as the canary bootstrap file. + + Args: + file_location (Path): The path to the contract test folder. + canary_root (Path): The path to the canary root folder. + """ + dependencies_file = file_location / CONTRACT_TEST_DEPENDENCY_FILE_NAME + bootstrap_file = canary_root / CANARY_DEPENDENCY_FILE_NAME + + if dependencies_file.exists(): + shutil.copy(dependencies_file, bootstrap_file) + + def create_canary_file( + self, + resource_type: str, + ct_file: Path, + canary_folder: Path, + canary_file_name_prefix: str, + count: int, + ) -> None: + """ + Create a canary file based on the contract test input file. + + This method generates a canary file in YAML format based on the provided contract test input file. + The canary file contains the resource configuration with dynamic values replaced. + + Args: + resource_type (str): The type of the resource being tested. + ct_file (Path): The path to the contract test input file. + canary_folder (Path): The path to the canary folder. + canary_file_name_prefix (str): The prefix for the canary file name. + count (int): The count of the canary file being generated. + """ + with ct_file.open("r") as f: + json_data = json.load(f) + resource_name = resource_type.split("::")[2] + canary_data = { + "Description": f"Canary template for {resource_type}", + "Resources": { + f"{resource_name}Canary": { + "Type": resource_type, + "Properties": self.replace_dynamic_values( + json_data["CreateInputs"] + ), + } + }, + } + canary_file_name = f"{canary_file_name_prefix}{count}_001.yaml" + canary_file_path = canary_folder / canary_file_name + + with canary_file_path.open("w") as canary_file: + yaml.dump(canary_data, canary_file, indent=2) + + def replace_dynamic_values(self, properties: Dict[str, Any]) -> Dict[str, Any]: + """ + Replace dynamic values in the resource properties. + + This method recursively replaces dynamic values in the resource properties dictionary. + It handles nested dictionaries, lists, and strings with dynamic value placeholders. + + Args: + properties (Dict[str, Any]): The resource properties dictionary. + + Returns: + Dict[str, Any]: The resource properties dictionary with dynamic values replaced. + """ + for key, value in properties.items(): + if isinstance(value, dict): + properties[key] = self.replace_dynamic_values(value) + elif isinstance(value, list): + properties[key] = [self._replace_dynamic_value(item) for item in value] + else: + return_value = self._replace_dynamic_value(value) + properties[key] = return_value + return properties + + def _replace_dynamic_value(self, original_value: Any) -> Any: + """ + Replace a dynamic value with its corresponding value. + + This method replaces dynamic value placeholders in a string with their corresponding values. + It handles UUID generation, partition replacement, and Fn::ImportValue function. + + Args: + original_value (Any): The value to be replaced. + + Returns: + Any: The replaced value. + """ + pattern = r"\{\{(.*?)\}\}" + + def replace_token(match): + token = match.group(1) + if UUID in token: + return str(uuid4()) + if token in DYNAMIC_VALUES_MAP: + return DYNAMIC_VALUES_MAP[token] + return f'{{"{FN_IMPORT_VALUE}": "{token.strip()}"}}' + + replaced_value = re.sub(pattern, replace_token, str(original_value)) + + if any(value in replaced_value for value in DYNAMIC_VALUES_MAP.values()): + replaced_value = {FN_SUB: replaced_value} + if FN_IMPORT_VALUE in replaced_value: + replaced_value = json.loads(replaced_value) + return replaced_value + + @property + def target_canary_root_path(self): + return self.root / TARGET_CANARY_ROOT_FOLDER + + @property + def target_canary_folder_path(self): + return self.root / TARGET_CANARY_FOLDER + + @property + def target_contract_test_folder_path(self): + return self.root / CONTRACT_TEST_FOLDER + + @property + def rpdk_config(self): + return self.root / RPDK_CONFIG_FILE diff --git a/src/rpdk/core/project.py b/src/rpdk/core/project.py index 1649bdd8..696caf9d 100644 --- a/src/rpdk/core/project.py +++ b/src/rpdk/core/project.py @@ -29,6 +29,11 @@ SpecValidationError, ) from .fragment.module_fragment_reader import _get_fragment_file +from .generate_canary import ( + CONTRACT_TEST_FILE_NAMES, + FILE_GENERATION_ENABLED, + INPUT1_FILE_NAME, +) from .jsonutils.pointer import fragment_decode, fragment_encode from .jsonutils.utils import traverse from .plugin_registry import load_plugin @@ -145,6 +150,7 @@ def __init__(self, overwrite_enabled=False, root=None): self.test_entrypoint = None self.executable_entrypoint = None self.fragment_dir = None + self.canary_settings = {} self.target_info = {} self.env = Environment( @@ -277,6 +283,7 @@ def validate_and_load_resource_settings(self, raw_settings): self.executable_entrypoint = raw_settings.get("executableEntrypoint") self._plugin = load_plugin(raw_settings["language"]) self.settings = raw_settings.get("settings", {}) + self.canary_settings = raw_settings.get("canarySettings", {}) def _write_example_schema(self): self.schema = resource_json( @@ -338,6 +345,7 @@ def _write_resource_settings(f): "testEntrypoint": self.test_entrypoint, "settings": self.settings, **executable_entrypoint_dict, + "canarySettings": self.canary_settings, }, f, indent=4, @@ -391,6 +399,10 @@ def init(self, type_name, language, settings=None): self.language = language self._plugin = load_plugin(language) self.settings = settings or {} + self.canary_settings = { + FILE_GENERATION_ENABLED: True, + CONTRACT_TEST_FILE_NAMES: [INPUT1_FILE_NAME], + } self._write_example_schema() self._write_example_inputs() self._plugin.init(self) diff --git a/tests/test_generate_canary.py b/tests/test_generate_canary.py new file mode 100644 index 00000000..66cc9257 --- /dev/null +++ b/tests/test_generate_canary.py @@ -0,0 +1,305 @@ +import json +import os +import re +import shutil +import uuid +from pathlib import Path +from unittest.mock import ANY, patch + +import pytest + +from rpdk.core.generate_canary import ( + CANARY_DEPENDENCY_FILE_NAME, + CANARY_FILE_PREFIX, + CONTRACT_TEST_DEPENDENCY_FILE_NAME, + CONTRACT_TEST_FOLDER, + FILE_GENERATION_ENABLED, + RPDK_CONFIG_FILE, + TARGET_CANARY_FOLDER, + TARGET_CANARY_ROOT_FOLDER, + CanaryFileGenerator, +) + + +@pytest.fixture(name="canary_file_generator_fixture") +def setup_fixture(tmp_path): + root_path = tmp_path + rpdk_config_path = root_path / RPDK_CONFIG_FILE + rpdk_config_path.parent.mkdir(parents=True, exist_ok=True) + rpdk_config = { + "typeName": "AWS::Example::Resource", + "canarySettings": { + FILE_GENERATION_ENABLED: True, + "contract_test_file_names": ["inputs_1.json", "inputs_2.json"], + }, + } + with rpdk_config_path.open("w") as f: + json.dump(rpdk_config, f) + contract_test_folder = root_path / CONTRACT_TEST_FOLDER + contract_test_folder.mkdir(parents=True, exist_ok=True) + # Create a dummy JSON file in the canary_root_path directory + create_dummy_json_file(contract_test_folder, "inputs_1.json") + create_dummy_json_file(contract_test_folder, "inputs_2.json") + (contract_test_folder / CONTRACT_TEST_DEPENDENCY_FILE_NAME).touch() + + return CanaryFileGenerator(str(root_path)) + + +def create_dummy_json_file(directory: Path, file_name: str): + """Create a dummy JSON file in the given directory.""" + dummy_json_file = directory / file_name + dummy_data = { + "CreateInputs": { + "Property1": "Value1", + "Property2": "Value1", + } + } + with dummy_json_file.open("w") as f: + json.dump(dummy_data, f) + + +def create_folder(folder: Path): + if os.path.exists(folder): + shutil.rmtree(folder) + folder.mkdir() + + +def test_is_file_generation_enabled(canary_file_generator_fixture): + assert canary_file_generator_fixture.is_file_generation_enabled() is True + + +def test_contract_test_folder_exists(canary_file_generator_fixture): + assert canary_file_generator_fixture.contract_test_folder_exists() is True + + +def test_generate_canary_files(canary_file_generator_fixture, tmp_path): + canary_file_generator_fixture.generate_canary_files() + + canary_root_path = tmp_path / TARGET_CANARY_ROOT_FOLDER + canary_folder_path = tmp_path / TARGET_CANARY_FOLDER + assert canary_root_path.exists() + assert canary_folder_path.exists() + + canary_files = list(canary_folder_path.glob(f"{CANARY_FILE_PREFIX}*")) + assert len(canary_files) == 2 + assert canary_files[0].name == f"{CANARY_FILE_PREFIX}2_001.yaml" + assert canary_files[1].name == f"{CANARY_FILE_PREFIX}1_001.yaml" + + bootstrap_file = canary_root_path / CANARY_DEPENDENCY_FILE_NAME + assert bootstrap_file.exists() + + +@pytest.mark.usefixtures("canary_file_generator_fixture") +def test_clean_and_create_canary_folder(canary_file_generator_fixture, tmp_path): + canary_root_path = tmp_path / TARGET_CANARY_ROOT_FOLDER + canary_folder_path = tmp_path / TARGET_CANARY_FOLDER + canary_root_path.mkdir() + (canary_root_path / "existing_file.txt").touch() + canary_file_generator_fixture.clean_and_create_canary_folder( + canary_root_path, canary_folder_path + ) + + assert canary_root_path.exists() + assert not list(canary_folder_path.glob("*")) + assert canary_folder_path.exists() + + +def test_create_canary_bootstrap(canary_file_generator_fixture, tmp_path): + contract_test_folder = tmp_path / CONTRACT_TEST_FOLDER + create_folder(contract_test_folder) + (contract_test_folder / CONTRACT_TEST_DEPENDENCY_FILE_NAME).touch() + + canary_root_path = tmp_path / TARGET_CANARY_ROOT_FOLDER + create_folder(canary_root_path) + + canary_file_generator_fixture.create_canary_bootstrap( + contract_test_folder, canary_root_path + ) + + bootstrap_file = canary_root_path / CANARY_DEPENDENCY_FILE_NAME + assert bootstrap_file.exists() + + +@patch("rpdk.core.generate_canary.yaml.dump") +def test_create_canary_file(mock_yaml_dump, canary_file_generator_fixture, tmp_path): + contract_test_folder = tmp_path / CONTRACT_TEST_FOLDER + create_folder(contract_test_folder) + contract_test_file = contract_test_folder / "inputs_1.json" + contract_test_data = { + "CreateInputs": { + "Property1": "Value1", + "Property2": "{{test123}}", + "Property3": {"Nested": "{{partition}}"}, + "Property4": ["{{region}}", "Value2"], + "Property5": "{{uuid}}", + "Property6": "{{account}}", + } + } + with contract_test_file.open("w") as f: + json.dump(contract_test_data, f) + + canary_folder_path = tmp_path / TARGET_CANARY_FOLDER + canary_folder_path.mkdir(parents=True, exist_ok=True) + + canary_file_generator_fixture.create_canary_file( + "AWS::Example::Resource", + contract_test_file, + canary_folder_path, + CANARY_FILE_PREFIX, + 1, + ) + + expected_canary_data = { + "Description": "Canary template for AWS::Example::Resource", + "Resources": { + "ResourceCanary": { + "Type": "AWS::Example::Resource", + "Properties": { + "Property1": "Value1", + "Property2": {"Fn::ImportValue": ANY}, + "Property3": {"Nested": {"Fn::Sub": "${AWS::Partition}"}}, + "Property4": [{"Fn::Sub": "${AWS::Region}"}, "Value2"], + "Property5": ANY, + "Property6": {"Fn::Sub": "${AWS::AccountId}"}, + }, + } + }, + } + mock_yaml_dump.assert_called_once_with(expected_canary_data, ANY, indent=2) + args, kwargs = mock_yaml_dump.call_args + assert args[0] == expected_canary_data + assert kwargs + # Assert UUID generation + replaced_properties = args[0]["Resources"]["ResourceCanary"]["Properties"] + assert isinstance(replaced_properties["Property5"], str) + assert len(replaced_properties["Property5"]) == 36 # Standard UUID length + assert re.match( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + replaced_properties["Property5"], + ) + + # Assert the generated UUID is a valid UUID + generated_uuid = replaced_properties["Property5"] + assert uuid.UUID(generated_uuid) + + +def test_replace_dynamic_values(canary_file_generator_fixture): + properties = { + "Property1": "Value1", + "Property2": "{{uuid}}", + "Property3": {"Nested": "{{partition}}"}, + "Property4": ["{{region}}", "Value2"], + "Property5": [{"Key": "{{uuid}}"}], + "Property6": "{{account}}", + "Property7": "prefix-{{uuid}}-sufix", + "Property8": "{{value8}}", + } + replaced_properties = canary_file_generator_fixture.replace_dynamic_values( + properties + ) + assert replaced_properties["Property1"] == "Value1" + assert isinstance(replaced_properties["Property2"], str) + assert len(replaced_properties["Property2"]) == 36 + assert re.match( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + replaced_properties["Property2"], + ) + assert replaced_properties["Property3"]["Nested"] == { + "Fn::Sub": "${AWS::Partition}" + } + assert replaced_properties["Property4"][0] == {"Fn::Sub": "${AWS::Region}"} + assert replaced_properties["Property4"][1] == "Value2" + assert replaced_properties["Property6"] == {"Fn::Sub": "${AWS::AccountId}"} + property7_value = replaced_properties["Property7"] + # Assert the replaced value + assert isinstance(property7_value, str) + assert "prefix-" in property7_value + assert "-sufix" in property7_value + # Extract the UUID part + property7_value = property7_value.replace("prefix-", "").replace("-sufix", "") + # Assert the UUID format + assert len(property7_value) == 36 # Standard UUID length + assert re.match( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", property7_value + ) + # Assert the UUID is a valid UUID + assert uuid.UUID(property7_value) + assert replaced_properties["Property8"] == {"Fn::ImportValue": "value8"} + + +def setup_rpdk_config(tmp_path, rpdk_config): + root_path = tmp_path + rpdk_config_path = root_path / RPDK_CONFIG_FILE + rpdk_config_path.parent.mkdir(parents=True, exist_ok=True) + with rpdk_config_path.open("w") as f: + json.dump(rpdk_config, f) + contract_test_folder = root_path / CONTRACT_TEST_FOLDER + contract_test_folder.mkdir(parents=True, exist_ok=True) + # Create a dummy JSON file in the canary_root_path directory + create_dummy_json_file(contract_test_folder, "inputs_1.json") + create_dummy_json_file(contract_test_folder, "inputs_2.json") + (contract_test_folder / CONTRACT_TEST_DEPENDENCY_FILE_NAME).touch() + return CanaryFileGenerator(str(root_path)) + + +def test_generate_canary_files_when_not_enabled(tmp_path): + rpdk_config = { + "typeName": "AWS::Example::Resource", + "canarySettings": { + FILE_GENERATION_ENABLED: False, + "contract_test_file_names": ["inputs_1.json", "inputs_2.json"], + }, + } + canary_file_generator_not_enabled_fixture = setup_rpdk_config(tmp_path, rpdk_config) + canary_file_generator_not_enabled_fixture.generate_canary_files() + + canary_root_path = tmp_path / TARGET_CANARY_ROOT_FOLDER + canary_folder_path = tmp_path / TARGET_CANARY_FOLDER + assert not canary_root_path.exists() + assert not canary_folder_path.exists() + + +def test_generate_canary_files_no_canary_settings(tmp_path): + rpdk_config = { + "typeName": "AWS::Example::Resource", + } + canary_file_generator_not_enabled_fixture = setup_rpdk_config(tmp_path, rpdk_config) + canary_file_generator_not_enabled_fixture.generate_canary_files() + + canary_root_path = tmp_path / TARGET_CANARY_ROOT_FOLDER + canary_folder_path = tmp_path / TARGET_CANARY_FOLDER + assert not canary_root_path.exists() + assert not canary_folder_path.exists() + + +def test_generate_canary_files_empty_input_files(tmp_path): + rpdk_config = { + "typeName": "AWS::Example::Resource", + "canarySettings": { + FILE_GENERATION_ENABLED: True, + "contract_test_file_names": [], + }, + } + canary_file_generator_not_enabled_fixture = setup_rpdk_config(tmp_path, rpdk_config) + canary_file_generator_not_enabled_fixture.generate_canary_files() + + canary_root_path = tmp_path / TARGET_CANARY_ROOT_FOLDER + canary_folder_path = tmp_path / TARGET_CANARY_FOLDER + assert canary_root_path.exists() + assert canary_folder_path.exists() + canary_files = list(canary_folder_path.glob(f"{CANARY_FILE_PREFIX}*")) + assert not canary_files + + +def test_generate_canary_files_empty_canary_settings(tmp_path): + rpdk_config = { + "typeName": "AWS::Example::Resource", + "canarySettings": {}, + } + canary_file_generator_not_enabled_fixture = setup_rpdk_config(tmp_path, rpdk_config) + canary_file_generator_not_enabled_fixture.generate_canary_files() + + canary_root_path = tmp_path / TARGET_CANARY_ROOT_FOLDER + canary_folder_path = tmp_path / TARGET_CANARY_FOLDER + assert not canary_root_path.exists() + assert not canary_folder_path.exists()