Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(anta): provide test context when test inputs validation fails #579

Merged
merged 8 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions anta/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pydantic import BaseModel, ConfigDict, RootModel, ValidationError, ValidationInfo, field_validator, model_validator
from pydantic.types import ImportString
from pydantic_core import PydanticCustomError
from yaml import YAMLError, safe_load

from anta.logger import anta_log_exception
Expand Down Expand Up @@ -70,12 +71,16 @@ def instantiate_inputs(cls, data: AntaTest.Input | dict[str, Any] | None, info:
if not (isclass(test_class) and issubclass(test_class, AntaTest)):
raise ValueError(f"Could not validate inputs as no test class {test_class} is not a subclass of AntaTest")

if data is None:
return test_class.Input()
if isinstance(data, AntaTest.Input):
return data
if isinstance(data, dict):
return test_class.Input(**data)
try:
if data is None:
return test_class.Input()
if isinstance(data, dict):
return test_class.Input(**data)
except ValidationError as e:
inputs_msg = str(e).replace("\n", "\n\t")
raise PydanticCustomError("wrong_test_inputs", f"{test_class.name} test inputs are not valid: {inputs_msg}\n", {"errors": e.errors()}) from e
raise ValueError(f"Coud not instantiate inputs as type {type(data).__name__} is not valid")

@model_validator(mode="after")
Expand Down
2 changes: 1 addition & 1 deletion anta/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from httpx import ConnectError, HTTPError

from anta import __DEBUG__, aioeapi
from anta.logger import exc_to_str
from anta.models import AntaCommand
from anta.tools.misc import exc_to_str

logger = logging.getLogger(__name__)

Expand Down
18 changes: 16 additions & 2 deletions anta/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from __future__ import annotations

import logging
import traceback
from enum import Enum
from pathlib import Path
from typing import Literal, Optional

from rich.logging import RichHandler

from anta import __DEBUG__
from anta.tools.misc import exc_to_str

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -87,14 +87,21 @@ def setup_logging(level: LogLevel = Log.INFO, file: Path | None = None) -> None:
logger.debug("ANTA Debug Mode enabled")


def exc_to_str(exception: BaseException) -> str:
"""
Helper function that returns a human readable string from an BaseException object
"""
return f"{type(exception).__name__}{f': {exception}' if str(exception) else ''}"


def anta_log_exception(exception: BaseException, message: Optional[str] = None, calling_logger: Optional[logging.Logger] = None) -> None:
"""
Helper function to help log exceptions:
* if anta.__DEBUG__ is True then the logger.exception method is called to get the traceback
* otherwise logger.error is called

Args:
exception (BAseException): The Exception being logged
exception (BaseException): The Exception being logged
message (str): An optional message
calling_logger (logging.Logger): A logger to which the exception should be logged
if not present, the logger in this file is used.
Expand All @@ -105,3 +112,10 @@ def anta_log_exception(exception: BaseException, message: Optional[str] = None,
calling_logger.critical(f"{message}\n{exc_to_str(exception)}" if message else exc_to_str(exception))
if __DEBUG__:
calling_logger.exception(f"[ANTA Debug Mode]{f' {message}' if message else ''}", exc_info=exception)


def tb_to_str(exception: BaseException) -> str:
"""
Helper function that returns a traceback string from an BaseException object
"""
return "Traceback (most recent call last):\n" + "".join(traceback.format_tb(exception.__traceback__))
3 changes: 1 addition & 2 deletions anta/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
from rich.progress import Progress, TaskID

from anta import GITHUB_SUGGESTION
from anta.logger import anta_log_exception
from anta.logger import anta_log_exception, exc_to_str
from anta.result_manager.models import TestResult
from anta.tools.misc import exc_to_str

if TYPE_CHECKING:
from anta.device import AntaDevice
Expand Down
2 changes: 2 additions & 0 deletions anta/tests/lanz.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from __future__ import annotations

from anta.decorators import skip_on_platforms
from anta.models import AntaCommand, AntaTest


Expand All @@ -24,6 +25,7 @@ class VerifyLANZ(AntaTest):
categories = ["lanz"]
commands = [AntaCommand(command="show queue-monitor length status")]

@skip_on_platforms(["cEOSLab", "vEOS-lab"])
@AntaTest.anta_test
def test(self) -> None:
command_output = self.instance_commands[0].json_output
Expand Down
35 changes: 18 additions & 17 deletions anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from ipaddress import IPv4Address, IPv4Network, IPv6Address
from typing import Any, List, Optional, Union, cast

from pydantic import BaseModel, Field, PositiveInt, model_validator, utils
from pydantic import BaseModel, Field, PositiveInt, model_validator
from pydantic.v1.utils import deep_update
from pydantic_extra_types.mac_address import MacAddress

from anta.custom_types import Afi, MultiProtocolCaps, Safi, Vni
Expand Down Expand Up @@ -125,7 +126,7 @@ def _add_bgp_routes_failure(
if route not in bgp_output:
# If missing, add it to the failure routes dictionary
failure["bgp_peers"][peer][vrf][route_type][route] = "Not found"
failure_routes = utils.deep_update(failure_routes, failure)
failure_routes = deep_update(failure_routes, failure)
continue

# Check if the route is active and valid
Expand All @@ -135,7 +136,7 @@ def _add_bgp_routes_failure(
# If the route is either inactive or invalid, add it to the failure routes dictionary
if not is_active or not is_valid:
failure["bgp_peers"][peer][vrf][route_type][route] = {"valid": is_valid, "active": is_active}
failure_routes = utils.deep_update(failure_routes, failure)
failure_routes = deep_update(failure_routes, failure)

return failure_routes

Expand Down Expand Up @@ -530,7 +531,7 @@ def test(self) -> None:
# Validate received routes
else:
failure_routes = _add_bgp_routes_failure(received_routes, bgp_routes, peer, vrf, route_type="received_routes")
failures = utils.deep_update(failures, failure_routes)
failures = deep_update(failures, failure_routes)

if not failures["bgp_peers"]:
self.result.is_success()
Expand Down Expand Up @@ -588,7 +589,7 @@ def test(self) -> None:
or (bgp_output := get_item(bgp_output, "peerAddress", peer)) is None
):
failure["bgp_peers"][peer][vrf] = {"status": "Not configured"}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)
continue

# Check each capability
Expand All @@ -599,12 +600,12 @@ def test(self) -> None:
# Check if capabilities are missing
if not capability_output:
failure["bgp_peers"][peer][vrf][capability] = "not found"
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)

# Check if capabilities are not advertised, received, or enabled
elif not all(capability_output.get(prop, False) for prop in ["advertised", "received", "enabled"]):
failure["bgp_peers"][peer][vrf][capability] = capability_output
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)

# Check if there are any failures
if not failures["bgp_peers"]:
Expand Down Expand Up @@ -660,20 +661,20 @@ def test(self) -> None:
or (bgp_output := get_item(bgp_output, "peerAddress", peer)) is None
):
failure["bgp_peers"][peer][vrf] = {"status": "Not configured"}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)
continue

bgp_output = get_value(bgp_output, "neighborCapabilities.fourOctetAsnCap")

# Check if four octet asn capabilities are found
if not bgp_output:
failure["bgp_peers"][peer][vrf] = {"fourOctetAsnCap": "not found"}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)

# Check if capabilities are not advertised, received, or enabled
elif not all(bgp_output.get(prop, False) for prop in ["advertised", "received", "enabled"]):
failure["bgp_peers"][peer][vrf] = {"fourOctetAsnCap": bgp_output}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)

# Check if there are any failures
if not failures["bgp_peers"]:
Expand Down Expand Up @@ -729,20 +730,20 @@ def test(self) -> None:
or (bgp_output := get_item(bgp_output, "peerAddress", peer)) is None
):
failure["bgp_peers"][peer][vrf] = {"status": "Not configured"}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)
continue

bgp_output = get_value(bgp_output, "neighborCapabilities.routeRefreshCap")

# Check if route refresh capabilities are found
if not bgp_output:
failure["bgp_peers"][peer][vrf] = {"routeRefreshCap": "not found"}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)

# Check if capabilities are not advertised, received, or enabled
elif not all(bgp_output.get(prop, False) for prop in ["advertised", "received", "enabled"]):
failure["bgp_peers"][peer][vrf] = {"routeRefreshCap": bgp_output}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)

# Check if there are any failures
if not failures["bgp_peers"]:
Expand Down Expand Up @@ -798,15 +799,15 @@ def test(self) -> None:
or (bgp_output := get_item(bgp_output, "peerAddress", peer)) is None
):
failure["bgp_peers"][peer][vrf] = {"status": "Not configured"}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)
continue

# Check if BGP peer state and authentication
state = bgp_output.get("state")
md5_auth_enabled = bgp_output.get("md5AuthEnabled")
if state != "Established" or not md5_auth_enabled:
failure["bgp_peers"][peer][vrf] = {"state": state, "md5_auth_enabled": md5_auth_enabled}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)

# Check if there are any failures
if not failures["bgp_peers"]:
Expand Down Expand Up @@ -924,14 +925,14 @@ def test(self) -> None:
or (bgp_output := get_item(bgp_output, "peerAddress", peer)) is None
):
failure["bgp_peers"][peer][vrf] = {"status": "Not configured"}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)
continue

# Verify BGP peer's advertised communities
bgp_output = bgp_output.get("advertisedCommunities")
if not bgp_output["standard"] or not bgp_output["extended"] or not bgp_output["large"]:
failure["bgp_peers"][peer][vrf] = {"advertised_communities": bgp_output}
failures = utils.deep_update(failures, failure)
failures = deep_update(failures, failure)

if not failures["bgp_peers"]:
self.result.is_success()
Expand Down
26 changes: 0 additions & 26 deletions anta/tools/misc.py

This file was deleted.

27 changes: 21 additions & 6 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ anta.tests.field_notices:
- VerifyFieldNotice44Resolution:
- VerifyFieldNotice72Resolution:

anta.tests.greent:
- VerifyGreenT:
- VerifyGreenTCounters:

anta.tests.hardware:
- VerifyTransceiversManufacturers:
manufacturers:
Expand All @@ -122,13 +126,13 @@ anta.tests.interfaces:
- VerifyInterfaceErrDisabled:
- VerifyInterfacesStatus:
interfaces:
- interface: Ethernet1
state: up
- interface: Port-Channel100
state: down
- name: Ethernet1
status: up
- name: Port-Channel100
status: down
line_protocol_status: lowerLayerDown
- interface: Ethernet49/1
state: adminDown
- name: Ethernet49/1
status: adminDown
line_protocol_status: notPresent
- VerifyStormControlDrops:
- VerifyPortChannels:
Expand Down Expand Up @@ -163,6 +167,9 @@ anta.tests.interfaces:
- VerifyIpVirtualRouterMac:
mac_address: 00:1c:73:00:dc:01

anta.tests.lanz:
- VerifyLANZ:

anta.tests.logging:
- VerifyLoggingPersistent:
- VerifyLoggingSourceIntf:
Expand Down Expand Up @@ -208,6 +215,14 @@ anta.tests.profiles:
- VerifyTcamProfile:
profile: vxlan-routing

anta.tests.ptp:
- PtpModeStatus:
- PtpPortModeStatus:
- PtpLockStatus:
- PtpGMStatus:
validGM: 0xec:46:70:ff:fe:00:ff:a9
- PtpOffset:

anta.tests.security:
- VerifySSHStatus:
- VerifySSHIPv4Acl:
Expand Down
8 changes: 4 additions & 4 deletions tests/units/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@
{
"name": "no_input_when_required",
"tests": [(FakeTestWithInput, None)],
"error": "Field required",
"error": "FakeTestWithInput test inputs are not valid: 1 validation error for Input\n\tstring\n\t Field required",
},
{
"name": "wrong_input_type",
"tests": [(FakeTestWithInput, True)],
"error": "Value error, Coud not instantiate inputs as type bool is not valid",
"tests": [(FakeTestWithInput, {"string": True})],
"error": "FakeTestWithInput test inputs are not valid: 1 validation error for Input\n\tstring\n\t Input should be a valid string",
},
]

Expand Down Expand Up @@ -243,7 +243,7 @@ def test_parse_fail_parsing(self, caplog: pytest.LogCaptureFixture) -> None:
assert len(caplog.record_tuples) >= 1
_, _, message = caplog.record_tuples[0]
assert "Unable to parse ANTA Test Catalog file" in message
assert "FileNotFoundError ([Errno 2] No such file or directory" in message
assert "FileNotFoundError: [Errno 2] No such file or directory" in message

@pytest.mark.parametrize("catalog_data", CATALOG_FROM_LIST_FAIL_DATA, ids=generate_test_ids_list(CATALOG_FROM_LIST_FAIL_DATA))
def test_from_list_fail(self, catalog_data: dict[str, Any]) -> None:
Expand Down
Loading
Loading