Skip to content

Commit

Permalink
improve error handling and ansible group parsing of anta get from-ans…
Browse files Browse the repository at this point in the history
…ible
  • Loading branch information
mtache committed Nov 9, 2023
1 parent 6a3bf38 commit e0a6dc8
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
21 changes: 13 additions & 8 deletions anta/cli/get/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from rich.pretty import pretty_repr

from anta.cli.console import console
from anta.cli.utils import parse_tags
from anta.cli.utils import ExitCode, parse_tags

from .utils import create_inventory_from_ansible, create_inventory_from_cvp, get_cv_token

Expand Down Expand Up @@ -74,7 +74,8 @@ def from_cvp(inventory_directory: str, cvp_ip: str, cvp_username: str, cvp_passw


@click.command(no_args_is_help=True)
@click.option("--ansible-group", "-g", help="Ansible group to filter", type=str, required=False)
@click.pass_context
@click.option("--ansible-group", "-g", help="Ansible group to filter", type=str, required=False, default="all")
@click.option(
"--ansible-inventory",
"-i",
Expand All @@ -89,17 +90,21 @@ def from_cvp(inventory_directory: str, cvp_ip: str, cvp_username: str, cvp_passw
help="Path to save inventory file",
type=click.Path(file_okay=True, dir_okay=False, exists=False, writable=True, path_type=Path),
)
def from_ansible(output: Path, ansible_inventory: Path, ansible_group: str) -> None:
def from_ansible(ctx: click.Context, output: Path, ansible_inventory: Path, ansible_group: str) -> None:
"""Build ANTA inventory from an ansible inventory YAML file"""
logger.info(f"Building inventory from ansible file {ansible_inventory}")

# Create output directory
output.parent.mkdir(parents=True, exist_ok=True)
create_inventory_from_ansible(
inventory=ansible_inventory,
output_file=output,
ansible_root=ansible_group,
)
try:
create_inventory_from_ansible(
inventory=ansible_inventory,
output_file=output,
ansible_group=ansible_group,
)
except ValueError as e:
logger.error(str(e))
ctx.exit(ExitCode.USAGE_ERROR)


@click.command()
Expand Down
53 changes: 27 additions & 26 deletions anta/cli/get/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
import json
import logging
from pathlib import Path
from typing import Any, Union
from typing import Any

import requests
import urllib3
import yaml

from ...inventory import AntaInventory

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from anta.inventory import AntaInventory
from anta.inventory.models import AntaInventoryHost, AntaInventoryInput

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,44 +50,47 @@ def create_inventory_from_cvp(inv: list[dict[str, Any]], directory: str, contain
logger.info(f"Inventory file has been created in {out_file}")


def create_inventory_from_ansible(inventory: Path, output_file: Path, ansible_root: Union[str, None] = None) -> None:
def create_inventory_from_ansible(inventory: Path, output_file: Path, ansible_group: str = "all") -> None:
"""
Create an ANTA inventory from an Ansible inventory YAML file
Args:
inventory (str): Ansible Inventory file to read
output_file (str, optional): ANTA inventory file to generate.
ansible_root (Union[str, None], optional): Ansible group from where to extract data. Defaults to None.
inventory: Ansible Inventory file to read
output_file: ANTA inventory file to generate.
ansible_root: Ansible group from where to extract data.
"""

def deep_yaml_parsing(data: dict[str, Any], hosts: Union[None, list[dict[str, str]]] = None) -> Union[None, list[dict[str, str]]]:
def find_ansible_group(data: dict[str, Any], group: str) -> dict[str, Any] | None:
for k, v in data.items():
if isinstance(v, dict):
if k == group and ("children" in v.keys() or "hosts" in v.keys()):
return v
d = find_ansible_group(v, group)
if d is not None:
return d
return None

def deep_yaml_parsing(data: dict[str, Any], hosts: list[AntaInventoryHost] | None = None) -> list[AntaInventoryHost]:
"""Deep parsing of YAML file to extract hosts and associated IPs"""
if hosts is None:
hosts = []
for key, value in data.items():
if isinstance(value, dict) and "ansible_host" in value.keys():
hosts.append({"name": key, "host": value["ansible_host"]})
logger.info(f" * adding entry for {key}")
hosts.append(AntaInventoryHost(name=key, host=value["ansible_host"]))
elif isinstance(value, dict):
deep_yaml_parsing(value, hosts)
else:
return hosts
return hosts

i: dict[str, dict[str, Any]] = {AntaInventory.INVENTORY_ROOT_KEY: {"hosts": []}}
with open(inventory, encoding="utf-8") as inv:
ansible_inventory = yaml.safe_load(inv)
if ansible_root not in ansible_inventory.keys():
logger.error(f"Group {ansible_root} not in ansible inventory {inventory}")
raise ValueError(f"Group {ansible_root} not in ansible inventory {inventory}")
if ansible_root is None:
ansible_hosts = deep_yaml_parsing(ansible_inventory, hosts=[])
else:
ansible_hosts = deep_yaml_parsing(ansible_inventory[ansible_root], hosts=[])
if ansible_hosts is None:
ansible_hosts = []
for dev in ansible_hosts:
logger.info(f' * adding entry for {dev["name"]}')
i[AntaInventory.INVENTORY_ROOT_KEY]["hosts"].append({"host": dev["host"], "name": dev["name"]})
ansible_inventory = find_ansible_group(ansible_inventory, ansible_group)
if ansible_inventory is None:
raise ValueError(f"Group {ansible_group} not found in Ansible inventory")
ansible_hosts = deep_yaml_parsing(ansible_inventory)
i = AntaInventoryInput(hosts=ansible_hosts)
with open(output_file, "w", encoding="UTF-8") as out_fd:
out_fd.write(yaml.dump(i))
logger.info(f"Inventory file has been created in {output_file}")
out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: i.model_dump(exclude_unset=True)}))
logger.info(f"ANTA device inventory file has been created in {output_file}")

0 comments on commit e0a6dc8

Please sign in to comment.