Skip to content

Commit

Permalink
Add function to only update not recreate component definition if it e…
Browse files Browse the repository at this point in the history
…xists

Signed-off-by: Sophia Wang <[email protected]>
  • Loading branch information
huiwangredhat committed Jan 3, 2025
1 parent f3f26fc commit 44a74fb
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 71 deletions.
83 changes: 13 additions & 70 deletions trestlebot/cli/commands/sync_cac_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,12 @@

"""Module for sync cac content command"""
import logging
from typing import Any, List
from typing import Any

import click

from trestlebot import const
from trestlebot.cli.options.common import common_options, git_options, handle_exceptions
from trestlebot.cli.utils import run_bot
from trestlebot.tasks.authored.compdef import (
AuthoredComponentDefinition,
FilterByProfile,
)
from trestlebot.tasks.base_task import ModelFilter, TaskBase
from trestlebot.tasks.regenerate_task import RegenerateTask
from trestlebot.tasks.rule_transform_task import RuleTransformTask
from trestlebot.transformers.cac_transformer import get_component_title
from trestlebot.transformers.yaml_transformer import ToRulesYAMLTransformer
from trestlebot.tasks.authored.compdef import AuthoredComponentDefinition


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -61,77 +51,30 @@
required=False,
default="service",
)
@click.option(
"--markdown-dir",
type=str,
help="Directory name to store markdown files.",
required=True,
)
@handle_exceptions
def sync_cac_content_cmd(ctx: click.Context, **kwargs: Any) -> None:
"""Transform CaC content to OSCAL component definition."""
# Steps:
# 1. Check options, logger errors if any and exit.
# 2. Create a new task to run the data transformation.
# 3. Initialize a Trestlebot object and run the task(s).
# 2. Initial product component definition
# 3. Create a new task to run the data transformation.
# 4. Initialize a Trestlebot object and run the task(s).

pre_tasks: List[TaskBase] = []
# pre_tasks: List[TaskBase] = []

oscal_profile = kwargs["oscal_profile"]
compdef_name = "cac-components"
product = kwargs["product"]
cac_content_root = kwargs["cac_content_root"]
component_title = get_component_title(product, cac_content_root)
component_description = kwargs["product"]
filter_by_profile = kwargs.get("filter_by_profile")
component_definition_type = kwargs.get("component_definition_type", "service")
markdown_dir = kwargs["markdown_dir"]
repo_path = kwargs["repo_path"]
if filter_by_profile:
filter_by_profile = FilterByProfile(repo_path, filter_by_profile)
working_dir = kwargs["repo_path"]

authored_comp: AuthoredComponentDefinition = AuthoredComponentDefinition(
trestle_root=repo_path,
trestle_root=working_dir,
)
authored_comp.create_new_default(
profile_name=oscal_profile,
compdef_name=compdef_name,
comp_title=component_title,
authored_comp.create_update_cac_compdef(
comp_description=component_description,
comp_type=component_definition_type,
filter_by_profile=filter_by_profile,
)
logger.info(f"Component definition name is: {component_title}.")

transformer: ToRulesYAMLTransformer = ToRulesYAMLTransformer()

model_filter: ModelFilter = ModelFilter(
[], [compdef_name, component_title, f"{const.RULE_PREFIX}*"]
product=product,
cac_content_root=cac_content_root,
working_dir=working_dir,
)
logger.info(f"model_filter is: {model_filter}.")

rule_transform_task: RuleTransformTask = RuleTransformTask(
working_dir=repo_path,
rules_view_dir=const.RULES_VIEW_DIR,
rule_transformer=transformer,
model_filter=model_filter,
)
logger.info(
f"Profile to filter controls in the component files is: {filter_by_profile}."
)
logger.debug(
f"Oscal profile in use with the component definition is: {oscal_profile}."
)
logger.debug(f"Component definition type is {component_definition_type}.")

pre_tasks.append(rule_transform_task)

regenerate_task: RegenerateTask = RegenerateTask(
authored_object=authored_comp,
markdown_dir=markdown_dir,
model_filter=model_filter,
)
pre_tasks.append(regenerate_task)

run_bot(pre_tasks, kwargs)

logger.debug(f"You have successfully authored the {compdef_name}.")
72 changes: 72 additions & 0 deletions trestlebot/tasks/authored/compdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

"""Trestle Bot functions for component definition authoring"""

import json
import logging
import os
import pathlib
from typing import Callable, List, Optional
Expand All @@ -13,14 +15,24 @@
from trestle.common.err import TrestleError
from trestle.common.model_utils import ModelUtils
from trestle.core.catalog.catalog_interface import CatalogInterface
from trestle.core.generators import generate_sample_model
from trestle.core.profile_resolver import ProfileResolver
from trestle.core.repository import AgileAuthoring
from trestle.oscal.component import (
ComponentDefinition,
ControlImplementation,
DefinedComponent,
)

from trestlebot.const import RULE_PREFIX, RULES_VIEW_DIR, YAML_EXTENSION
from trestlebot.tasks.authored.base_authored import (
AuthoredObjectBase,
AuthoredObjectException,
)
from trestlebot.transformers.cac_transformer import (
get_component_title,
update_component_definition,
)
from trestlebot.transformers.trestle_rule import (
ComponentInfo,
Control,
Expand All @@ -30,6 +42,9 @@
from trestlebot.transformers.yaml_transformer import FromRulesYAMLTransformer


logger = logging.getLogger(__name__)


class FilterByProfile:
"""Filter controls by a profile."""

Expand Down Expand Up @@ -158,6 +173,63 @@ def create_new_default(
)
rules_view_builder.write_to_yaml(rule_dir)

def create_update_cac_compdef(
self,
comp_description: str,
comp_type: str,
product: str,
cac_content_root: str,
working_dir: str,
) -> None:
"""Create component definition for cac content
Args:
comp_description: Description of the component
comp_type: Type of the component
product: Product name for the component
cac_content_root: ComplianceAsCode repo path
working_dir: workplace repo path
"""
# Initial component definition fields
component_definition = generate_sample_model(ComponentDefinition)
component_definition.metadata.title = f"Component definition for {product}"
component_definition.metadata.version = "1.0"
component_definition.components = list()
oscal_component = generate_sample_model(DefinedComponent)
oscal_component.title = get_component_title(product, cac_content_root)
oscal_component.type = comp_type
oscal_component.description = comp_description

# Create all of the component properties for rules
# This part will be updated in CPLYTM-218
"""
rules: List[RuleInfo] = self.rules_transformer.get_all_rules()
all_rule_properties: List[Property] = self.rules_transformer.transform(rules)
oscal_component.props = none_if_empty(all_rule_properties)
"""
repo_path = pathlib.Path(working_dir)
out_path = repo_path.joinpath(f"{const.MODEL_DIR_COMPDEF}/{product}/")
oname = "component-definition.json"
ofile = out_path / oname
if ofile.exists():
logger.info(f"The component for product {product} exists.")
with open(ofile, "r", encoding="utf-8") as f:
data = json.load(f)
for component in data["component-definition"]["components"]:
if component.get("title") == get_component_title(
product, cac_content_root
):
logger.info(f"Update the exsisting component definition.")
# Need to update props parts if the rules updated
# Update the version and last modify time
update_component_definition(ofile)
else:
logger.info(f"Creating component definition for product {product}")
out_path.mkdir(exist_ok=True, parents=True)
ofile = out_path / oname
component_definition.components.append(oscal_component)
component_definition.oscal_write(ofile)


class RulesViewBuilder:
"""Write TrestleRule objects to YAML files in rules view."""
Expand Down
17 changes: 16 additions & 1 deletion trestlebot/transformers/cac_transformer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2024 Red Hat, Inc.

"""Cac content Transformer for rule authoring. """
import datetime
import json

from ssg.products import load_product_yaml, product_yaml_path

Expand All @@ -17,3 +18,17 @@ def get_component_title(product_name: str, cac_path: str) -> str:
return product._primary_data.get("product")
else:
raise ValueError("component_title is empty or None")


def update_component_definition(compdef_file) -> str:
# Update the component definition version and modify time
with open(compdef_file, "r", encoding="utf-8") as f:
data = json.load(f)
current_version = data["component-definition"]["metadata"]["version"]
data["component-definition"]["metadata"]["version"] = str(
"{:.1f}".format(float(current_version) + 0.1)
)
current_time = datetime.datetime.now().isoformat()
data["component-definition"]["metadata"]["last-modified"] = current_time
with open(compdef_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)

0 comments on commit 44a74fb

Please sign in to comment.