From 44a74fb084a1cc92b21fd102e3c3ebbc945a930e Mon Sep 17 00:00:00 2001 From: Sophia Wang Date: Fri, 3 Jan 2025 15:11:56 +0800 Subject: [PATCH] Add function to only update not recreate component definition if it exists Signed-off-by: Sophia Wang --- trestlebot/cli/commands/sync_cac_content.py | 83 ++++----------------- trestlebot/tasks/authored/compdef.py | 72 ++++++++++++++++++ trestlebot/transformers/cac_transformer.py | 17 ++++- 3 files changed, 101 insertions(+), 71 deletions(-) diff --git a/trestlebot/cli/commands/sync_cac_content.py b/trestlebot/cli/commands/sync_cac_content.py index 196db641..6e633623 100644 --- a/trestlebot/cli/commands/sync_cac_content.py +++ b/trestlebot/cli/commands/sync_cac_content.py @@ -3,22 +3,12 @@ """Module for sync cac content command""" import logging -from typing import Any, List +from typing import Any import click -from trestlebot import const from trestlebot.cli.options.common import common_options, git_options, handle_exceptions -from trestlebot.cli.utils import run_bot -from trestlebot.tasks.authored.compdef import ( - AuthoredComponentDefinition, - FilterByProfile, -) -from trestlebot.tasks.base_task import ModelFilter, TaskBase -from trestlebot.tasks.regenerate_task import RegenerateTask -from trestlebot.tasks.rule_transform_task import RuleTransformTask -from trestlebot.transformers.cac_transformer import get_component_title -from trestlebot.transformers.yaml_transformer import ToRulesYAMLTransformer +from trestlebot.tasks.authored.compdef import AuthoredComponentDefinition logger = logging.getLogger(__name__) @@ -61,77 +51,30 @@ required=False, default="service", ) -@click.option( - "--markdown-dir", - type=str, - help="Directory name to store markdown files.", - required=True, -) @handle_exceptions def sync_cac_content_cmd(ctx: click.Context, **kwargs: Any) -> None: """Transform CaC content to OSCAL component definition.""" # Steps: # 1. Check options, logger errors if any and exit. - # 2. Create a new task to run the data transformation. - # 3. Initialize a Trestlebot object and run the task(s). + # 2. Initial product component definition + # 3. Create a new task to run the data transformation. + # 4. Initialize a Trestlebot object and run the task(s). - pre_tasks: List[TaskBase] = [] + # pre_tasks: List[TaskBase] = [] - oscal_profile = kwargs["oscal_profile"] - compdef_name = "cac-components" product = kwargs["product"] cac_content_root = kwargs["cac_content_root"] - component_title = get_component_title(product, cac_content_root) component_description = kwargs["product"] - filter_by_profile = kwargs.get("filter_by_profile") component_definition_type = kwargs.get("component_definition_type", "service") - markdown_dir = kwargs["markdown_dir"] - repo_path = kwargs["repo_path"] - if filter_by_profile: - filter_by_profile = FilterByProfile(repo_path, filter_by_profile) + working_dir = kwargs["repo_path"] + authored_comp: AuthoredComponentDefinition = AuthoredComponentDefinition( - trestle_root=repo_path, + trestle_root=working_dir, ) - authored_comp.create_new_default( - profile_name=oscal_profile, - compdef_name=compdef_name, - comp_title=component_title, + authored_comp.create_update_cac_compdef( comp_description=component_description, comp_type=component_definition_type, - filter_by_profile=filter_by_profile, - ) - logger.info(f"Component definition name is: {component_title}.") - - transformer: ToRulesYAMLTransformer = ToRulesYAMLTransformer() - - model_filter: ModelFilter = ModelFilter( - [], [compdef_name, component_title, f"{const.RULE_PREFIX}*"] + product=product, + cac_content_root=cac_content_root, + working_dir=working_dir, ) - logger.info(f"model_filter is: {model_filter}.") - - rule_transform_task: RuleTransformTask = RuleTransformTask( - working_dir=repo_path, - rules_view_dir=const.RULES_VIEW_DIR, - rule_transformer=transformer, - model_filter=model_filter, - ) - logger.info( - f"Profile to filter controls in the component files is: {filter_by_profile}." - ) - logger.debug( - f"Oscal profile in use with the component definition is: {oscal_profile}." - ) - logger.debug(f"Component definition type is {component_definition_type}.") - - pre_tasks.append(rule_transform_task) - - regenerate_task: RegenerateTask = RegenerateTask( - authored_object=authored_comp, - markdown_dir=markdown_dir, - model_filter=model_filter, - ) - pre_tasks.append(regenerate_task) - - run_bot(pre_tasks, kwargs) - - logger.debug(f"You have successfully authored the {compdef_name}.") diff --git a/trestlebot/tasks/authored/compdef.py b/trestlebot/tasks/authored/compdef.py index 66c28af3..8e35f801 100644 --- a/trestlebot/tasks/authored/compdef.py +++ b/trestlebot/tasks/authored/compdef.py @@ -4,6 +4,8 @@ """Trestle Bot functions for component definition authoring""" +import json +import logging import os import pathlib from typing import Callable, List, Optional @@ -13,14 +15,24 @@ from trestle.common.err import TrestleError from trestle.common.model_utils import ModelUtils from trestle.core.catalog.catalog_interface import CatalogInterface +from trestle.core.generators import generate_sample_model from trestle.core.profile_resolver import ProfileResolver from trestle.core.repository import AgileAuthoring +from trestle.oscal.component import ( + ComponentDefinition, + ControlImplementation, + DefinedComponent, +) from trestlebot.const import RULE_PREFIX, RULES_VIEW_DIR, YAML_EXTENSION from trestlebot.tasks.authored.base_authored import ( AuthoredObjectBase, AuthoredObjectException, ) +from trestlebot.transformers.cac_transformer import ( + get_component_title, + update_component_definition, +) from trestlebot.transformers.trestle_rule import ( ComponentInfo, Control, @@ -30,6 +42,9 @@ from trestlebot.transformers.yaml_transformer import FromRulesYAMLTransformer +logger = logging.getLogger(__name__) + + class FilterByProfile: """Filter controls by a profile.""" @@ -158,6 +173,63 @@ def create_new_default( ) rules_view_builder.write_to_yaml(rule_dir) + def create_update_cac_compdef( + self, + comp_description: str, + comp_type: str, + product: str, + cac_content_root: str, + working_dir: str, + ) -> None: + """Create component definition for cac content + + Args: + comp_description: Description of the component + comp_type: Type of the component + product: Product name for the component + cac_content_root: ComplianceAsCode repo path + working_dir: workplace repo path + """ + # Initial component definition fields + component_definition = generate_sample_model(ComponentDefinition) + component_definition.metadata.title = f"Component definition for {product}" + component_definition.metadata.version = "1.0" + component_definition.components = list() + oscal_component = generate_sample_model(DefinedComponent) + oscal_component.title = get_component_title(product, cac_content_root) + oscal_component.type = comp_type + oscal_component.description = comp_description + + # Create all of the component properties for rules + # This part will be updated in CPLYTM-218 + """ + rules: List[RuleInfo] = self.rules_transformer.get_all_rules() + all_rule_properties: List[Property] = self.rules_transformer.transform(rules) + oscal_component.props = none_if_empty(all_rule_properties) + """ + repo_path = pathlib.Path(working_dir) + out_path = repo_path.joinpath(f"{const.MODEL_DIR_COMPDEF}/{product}/") + oname = "component-definition.json" + ofile = out_path / oname + if ofile.exists(): + logger.info(f"The component for product {product} exists.") + with open(ofile, "r", encoding="utf-8") as f: + data = json.load(f) + for component in data["component-definition"]["components"]: + if component.get("title") == get_component_title( + product, cac_content_root + ): + logger.info(f"Update the exsisting component definition.") + # Need to update props parts if the rules updated + # Update the version and last modify time + update_component_definition(ofile) + else: + logger.info(f"Creating component definition for product {product}") + out_path.mkdir(exist_ok=True, parents=True) + ofile = out_path / oname + component_definition.components.append(oscal_component) + component_definition.oscal_write(ofile) + class RulesViewBuilder: """Write TrestleRule objects to YAML files in rules view.""" diff --git a/trestlebot/transformers/cac_transformer.py b/trestlebot/transformers/cac_transformer.py index 4c3b2a25..4863b974 100644 --- a/trestlebot/transformers/cac_transformer.py +++ b/trestlebot/transformers/cac_transformer.py @@ -1,7 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # Copyright (c) 2024 Red Hat, Inc. -"""Cac content Transformer for rule authoring. """ +import datetime +import json from ssg.products import load_product_yaml, product_yaml_path @@ -17,3 +18,17 @@ def get_component_title(product_name: str, cac_path: str) -> str: return product._primary_data.get("product") else: raise ValueError("component_title is empty or None") + + +def update_component_definition(compdef_file) -> str: + # Update the component definition version and modify time + with open(compdef_file, "r", encoding="utf-8") as f: + data = json.load(f) + current_version = data["component-definition"]["metadata"]["version"] + data["component-definition"]["metadata"]["version"] = str( + "{:.1f}".format(float(current_version) + 0.1) + ) + current_time = datetime.datetime.now().isoformat() + data["component-definition"]["metadata"]["last-modified"] = current_time + with open(compdef_file, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2)