From e0a6dc8100b31302c2a5455d2fcaf5d9864426f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthieu=20T=C3=A2che?= Date: Fri, 10 Nov 2023 00:22:56 +0100 Subject: [PATCH] improve error handling and ansible group parsing of anta get from-ansible --- anta/cli/get/commands.py | 21 ++++++++++------ anta/cli/get/utils.py | 53 ++++++++++++++++++++-------------------- 2 files changed, 40 insertions(+), 34 deletions(-) diff --git a/anta/cli/get/commands.py b/anta/cli/get/commands.py index 86ca1b354..44519f33b 100644 --- a/anta/cli/get/commands.py +++ b/anta/cli/get/commands.py @@ -22,7 +22,7 @@ from rich.pretty import pretty_repr from anta.cli.console import console -from anta.cli.utils import parse_tags +from anta.cli.utils import ExitCode, parse_tags from .utils import create_inventory_from_ansible, create_inventory_from_cvp, get_cv_token @@ -74,7 +74,8 @@ def from_cvp(inventory_directory: str, cvp_ip: str, cvp_username: str, cvp_passw @click.command(no_args_is_help=True) -@click.option("--ansible-group", "-g", help="Ansible group to filter", type=str, required=False) +@click.pass_context +@click.option("--ansible-group", "-g", help="Ansible group to filter", type=str, required=False, default="all") @click.option( "--ansible-inventory", "-i", @@ -89,17 +90,21 @@ def from_cvp(inventory_directory: str, cvp_ip: str, cvp_username: str, cvp_passw help="Path to save inventory file", type=click.Path(file_okay=True, dir_okay=False, exists=False, writable=True, path_type=Path), ) -def from_ansible(output: Path, ansible_inventory: Path, ansible_group: str) -> None: +def from_ansible(ctx: click.Context, output: Path, ansible_inventory: Path, ansible_group: str) -> None: """Build ANTA inventory from an ansible inventory YAML file""" logger.info(f"Building inventory from ansible file {ansible_inventory}") # Create output directory output.parent.mkdir(parents=True, exist_ok=True) - create_inventory_from_ansible( - inventory=ansible_inventory, - output_file=output, - ansible_root=ansible_group, - ) + try: + create_inventory_from_ansible( + inventory=ansible_inventory, + output_file=output, + ansible_group=ansible_group, + ) + except ValueError as e: + logger.error(str(e)) + ctx.exit(ExitCode.USAGE_ERROR) @click.command() diff --git a/anta/cli/get/utils.py b/anta/cli/get/utils.py index 655a04f72..fc746209f 100644 --- a/anta/cli/get/utils.py +++ b/anta/cli/get/utils.py @@ -10,15 +10,13 @@ import json import logging from pathlib import Path -from typing import Any, Union +from typing import Any import requests -import urllib3 import yaml -from ...inventory import AntaInventory - -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +from anta.inventory import AntaInventory +from anta.inventory.models import AntaInventoryHost, AntaInventoryInput logger = logging.getLogger(__name__) @@ -52,44 +50,47 @@ def create_inventory_from_cvp(inv: list[dict[str, Any]], directory: str, contain logger.info(f"Inventory file has been created in {out_file}") -def create_inventory_from_ansible(inventory: Path, output_file: Path, ansible_root: Union[str, None] = None) -> None: +def create_inventory_from_ansible(inventory: Path, output_file: Path, ansible_group: str = "all") -> None: """ Create an ANTA inventory from an Ansible inventory YAML file Args: - inventory (str): Ansible Inventory file to read - output_file (str, optional): ANTA inventory file to generate. - ansible_root (Union[str, None], optional): Ansible group from where to extract data. Defaults to None. + inventory: Ansible Inventory file to read + output_file: ANTA inventory file to generate. + ansible_root: Ansible group from where to extract data. """ - def deep_yaml_parsing(data: dict[str, Any], hosts: Union[None, list[dict[str, str]]] = None) -> Union[None, list[dict[str, str]]]: + def find_ansible_group(data: dict[str, Any], group: str) -> dict[str, Any] | None: + for k, v in data.items(): + if isinstance(v, dict): + if k == group and ("children" in v.keys() or "hosts" in v.keys()): + return v + d = find_ansible_group(v, group) + if d is not None: + return d + return None + + def deep_yaml_parsing(data: dict[str, Any], hosts: list[AntaInventoryHost] | None = None) -> list[AntaInventoryHost]: """Deep parsing of YAML file to extract hosts and associated IPs""" if hosts is None: hosts = [] for key, value in data.items(): if isinstance(value, dict) and "ansible_host" in value.keys(): - hosts.append({"name": key, "host": value["ansible_host"]}) + logger.info(f" * adding entry for {key}") + hosts.append(AntaInventoryHost(name=key, host=value["ansible_host"])) elif isinstance(value, dict): deep_yaml_parsing(value, hosts) else: return hosts return hosts - i: dict[str, dict[str, Any]] = {AntaInventory.INVENTORY_ROOT_KEY: {"hosts": []}} with open(inventory, encoding="utf-8") as inv: ansible_inventory = yaml.safe_load(inv) - if ansible_root not in ansible_inventory.keys(): - logger.error(f"Group {ansible_root} not in ansible inventory {inventory}") - raise ValueError(f"Group {ansible_root} not in ansible inventory {inventory}") - if ansible_root is None: - ansible_hosts = deep_yaml_parsing(ansible_inventory, hosts=[]) - else: - ansible_hosts = deep_yaml_parsing(ansible_inventory[ansible_root], hosts=[]) - if ansible_hosts is None: - ansible_hosts = [] - for dev in ansible_hosts: - logger.info(f' * adding entry for {dev["name"]}') - i[AntaInventory.INVENTORY_ROOT_KEY]["hosts"].append({"host": dev["host"], "name": dev["name"]}) + ansible_inventory = find_ansible_group(ansible_inventory, ansible_group) + if ansible_inventory is None: + raise ValueError(f"Group {ansible_group} not found in Ansible inventory") + ansible_hosts = deep_yaml_parsing(ansible_inventory) + i = AntaInventoryInput(hosts=ansible_hosts) with open(output_file, "w", encoding="UTF-8") as out_fd: - out_fd.write(yaml.dump(i)) - logger.info(f"Inventory file has been created in {output_file}") + out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: i.model_dump(exclude_unset=True)})) + logger.info(f"ANTA device inventory file has been created in {output_file}")