Skip to content

Commit

Permalink
Merge branch 'main' into issue_822
Browse files Browse the repository at this point in the history
  • Loading branch information
vitthalmagadum authored Sep 27, 2024
2 parents b10c14f + 2214ff0 commit 7fa9d7f
Show file tree
Hide file tree
Showing 25 changed files with 763 additions and 115 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/code-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,20 @@ jobs:
run: pip install .[doc]
- name: "Build mkdocs documentation offline"
run: mkdocs build
benchmarks:
name: Benchmark ANTA for Python 3.12
runs-on: ubuntu-latest
needs: [test-python]
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install .[dev]
- name: Run benchmarks
uses: CodSpeedHQ/action@v3
with:
token: ${{ secrets.CODSPEED_TOKEN }}
run: pytest --codspeed --no-cov --log-cli-level INFO tests/benchmark
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ repos:
- types-pyOpenSSL
- pylint_pydantic
- pytest
- pytest-codspeed
- respx

- repo: https://github.com/codespell-project/codespell
Expand Down
8 changes: 7 additions & 1 deletion anta/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@
from anta.models import AntaTest

if TYPE_CHECKING:
import sys
from types import ModuleType

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

logger = logging.getLogger(__name__)

# { <module_name> : [ { <test_class_name>: <input_as_dict_or_None> }, ... ] }
Expand Down Expand Up @@ -123,7 +129,7 @@ def instantiate_inputs(
raise ValueError(msg)

@model_validator(mode="after")
def check_inputs(self) -> AntaTestDefinition:
def check_inputs(self) -> Self:
"""Check the `inputs` field typing.
The `inputs` class attribute needs to be an instance of the AntaTest.Input subclass defined in the class `test`.
Expand Down
2 changes: 1 addition & 1 deletion anta/tests/field_notices.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,4 @@ def test(self) -> None:
self.result.is_success("FN72 is mitigated")
return
# We should never hit this point
self.result.is_error("Error in running test - FixedSystemvrm1 not found")
self.result.is_failure("Error in running test - Component FixedSystemvrm1 not found in 'show version'")
4 changes: 2 additions & 2 deletions anta/tests/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test(self) -> None:
if ((duplex := (interface := interfaces["interfaces"][intf]).get("duplex", None)) is not None and duplex != duplex_full) or (
(members := interface.get("memberInterfaces", None)) is not None and any(stats["duplex"] != duplex_full for stats in members.values())
):
self.result.is_error(f"Interface {intf} or one of its member interfaces is not Full-Duplex. VerifyInterfaceUtilization has not been implemented.")
self.result.is_failure(f"Interface {intf} or one of its member interfaces is not Full-Duplex. VerifyInterfaceUtilization has not been implemented.")
return

if (bandwidth := interfaces["interfaces"][intf]["bandwidth"]) == 0:
Expand Down Expand Up @@ -705,7 +705,7 @@ def test(self) -> None:
input_interface_detail = interface
break
else:
self.result.is_error(f"Could not find `{intf}` in the input interfaces. {GITHUB_SUGGESTION}")
self.result.is_failure(f"Could not find `{intf}` in the input interfaces. {GITHUB_SUGGESTION}")
continue

input_primary_ip = str(input_interface_detail.primary_ip)
Expand Down
5 changes: 1 addition & 4 deletions anta/tests/mlag.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ class VerifyMlagConfigSanity(AntaTest):
def test(self) -> None:
"""Main test function for VerifyMlagConfigSanity."""
command_output = self.instance_commands[0].json_output
if (mlag_status := get_value(command_output, "mlagActive")) is None:
self.result.is_error(message="Incorrect JSON response - 'mlagActive' state was not found")
return
if mlag_status is False:
if command_output["mlagActive"] is False:
self.result.is_skipped("MLAG is disabled")
return
keys_to_verify = ["globalConfiguration", "interfaceConfiguration"]
Expand Down
18 changes: 13 additions & 5 deletions anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

from ipaddress import IPv4Address, IPv4Network, IPv6Address
from typing import Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar

from pydantic import BaseModel, Field, PositiveInt, model_validator
from pydantic.v1.utils import deep_update
Expand All @@ -18,6 +18,14 @@
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import get_item, get_value

if TYPE_CHECKING:
import sys

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self


def _add_bgp_failures(failures: dict[tuple[str, str | None], dict[str, Any]], afi: Afi, safi: Safi | None, vrf: str, issue: str | dict[str, Any]) -> None:
"""Add a BGP failure entry to the given `failures` dictionary.
Expand Down Expand Up @@ -235,7 +243,7 @@ class BgpAfi(BaseModel):
"""Number of expected BGP peer(s)."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the BgpAfi class.
If afi is either ipv4 or ipv6, safi must be provided.
Expand Down Expand Up @@ -375,7 +383,7 @@ class BgpAfi(BaseModel):
"""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the BgpAfi class.
If afi is either ipv4 or ipv6, safi must be provided.
Expand Down Expand Up @@ -522,7 +530,7 @@ class BgpAfi(BaseModel):
"""List of BGP IPv4 or IPv6 peer."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the BgpAfi class.
If afi is either ipv4 or ipv6, safi must be provided and vrf must NOT be all.
Expand Down Expand Up @@ -1485,7 +1493,7 @@ class BgpPeer(BaseModel):
"""Outbound route map applied, defaults to None."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the BgpPeer class.
At least one of 'inbound' or 'outbound' route-map must be provided.
Expand Down
19 changes: 14 additions & 5 deletions anta/tests/routing/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@

from functools import cache
from ipaddress import IPv4Address, IPv4Interface
from typing import ClassVar, Literal
from typing import TYPE_CHECKING, ClassVar, Literal

from pydantic import model_validator

from anta.custom_types import PositiveInteger
from anta.models import AntaCommand, AntaTemplate, AntaTest

if TYPE_CHECKING:
import sys

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self


class VerifyRoutingProtocolModel(AntaTest):
"""Verifies the configured routing protocol model is the one we expect.
Expand Down Expand Up @@ -84,13 +93,13 @@ class VerifyRoutingTableSize(AntaTest):
class Input(AntaTest.Input):
"""Input model for the VerifyRoutingTableSize test."""

minimum: int
minimum: PositiveInteger
"""Expected minimum routing table size."""
maximum: int
maximum: PositiveInteger
"""Expected maximum routing table size."""

@model_validator(mode="after") # type: ignore[misc]
def check_min_max(self) -> AntaTest.Input:
@model_validator(mode="after")
def check_min_max(self) -> Self:
"""Validate that maximum is greater than minimum."""
if self.minimum > self.maximum:
msg = f"Minimum {self.minimum} is greater than maximum {self.maximum}"
Expand Down
22 changes: 15 additions & 7 deletions anta/tests/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@
# mypy: disable-error-code=attr-defined
from datetime import datetime, timezone
from ipaddress import IPv4Address
from typing import ClassVar
from typing import TYPE_CHECKING, ClassVar, get_args

from pydantic import BaseModel, Field, model_validator

from anta.custom_types import EcdsaKeySize, EncryptionAlgorithm, PositiveInteger, RsaKeySize
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import get_failed_logs, get_item, get_value

if TYPE_CHECKING:
import sys

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self


class VerifySSHStatus(AntaTest):
"""Verifies if the SSHD agent is disabled in the default VRF.
Expand Down Expand Up @@ -47,7 +55,7 @@ def test(self) -> None:
try:
line = next(line for line in command_output.split("\n") if line.startswith("SSHD status"))
except StopIteration:
self.result.is_error("Could not find SSH status in returned output.")
self.result.is_failure("Could not find SSH status in returned output.")
return
status = line.split("is ")[1]

Expand Down Expand Up @@ -416,19 +424,19 @@ class APISSLCertificate(BaseModel):
"""The encryption algorithm key size of the certificate."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
def validate_inputs(self) -> Self:
"""Validate the key size provided to the APISSLCertificates class.
If encryption_algorithm is RSA then key_size should be in {2048, 3072, 4096}.
If encryption_algorithm is ECDSA then key_size should be in {256, 384, 521}.
"""
if self.encryption_algorithm == "RSA" and self.key_size not in RsaKeySize.__args__:
msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for RSA encryption. Allowed sizes are {RsaKeySize.__args__}."
if self.encryption_algorithm == "RSA" and self.key_size not in get_args(RsaKeySize):
msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for RSA encryption. Allowed sizes are {get_args(RsaKeySize)}."
raise ValueError(msg)

if self.encryption_algorithm == "ECDSA" and self.key_size not in EcdsaKeySize.__args__:
msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for ECDSA encryption. Allowed sizes are {EcdsaKeySize.__args__}."
if self.encryption_algorithm == "ECDSA" and self.key_size not in get_args(EcdsaKeySize):
msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for ECDSA encryption. Allowed sizes are {get_args(EcdsaKeySize)}."
raise ValueError(msg)

return self
Expand Down
63 changes: 62 additions & 1 deletion anta/tests/stp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# mypy: disable-error-code=attr-defined
from __future__ import annotations

from typing import ClassVar, Literal
from typing import Any, ClassVar, Literal

from pydantic import Field

Expand Down Expand Up @@ -259,3 +259,64 @@ def test(self) -> None:
self.result.is_failure(f"The following instance(s) have the wrong STP root priority configured: {wrong_priority_instances}")
else:
self.result.is_success()


class VerifyStpTopologyChanges(AntaTest):
"""Verifies the number of changes across all interfaces in the Spanning Tree Protocol (STP) topology is below a threshold.
Expected Results
----------------
* Success: The test will pass if the total number of changes across all interfaces is less than the specified threshold.
* Failure: The test will fail if the total number of changes across all interfaces meets or exceeds the specified threshold,
indicating potential instability in the topology.
Examples
--------
```yaml
anta.tests.stp:
- VerifyStpTopologyChanges:
threshold: 10
```
"""

name = "VerifyStpTopologyChanges"
description = "Verifies the number of changes across all interfaces in the Spanning Tree Protocol (STP) topology is below a threshold."
categories: ClassVar[list[str]] = ["stp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show spanning-tree topology status detail", revision=1)]

class Input(AntaTest.Input):
"""Input model for the VerifyStpTopologyChanges test."""

threshold: int
"""The threshold number of changes in the STP topology."""

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyStpTopologyChanges."""
failures: dict[str, Any] = {"topologies": {}}

command_output = self.instance_commands[0].json_output
stp_topologies = command_output.get("topologies", {})

# verifies all available topologies except the "NoStp" topology.
stp_topologies.pop("NoStp", None)

# Verify the STP topology(s).
if not stp_topologies:
self.result.is_failure("STP is not configured.")
return

# Verifies the number of changes across all interfaces
for topology, topology_details in stp_topologies.items():
interfaces = {
interface: {"Number of changes": num_of_changes}
for interface, details in topology_details.get("interfaces", {}).items()
if (num_of_changes := details.get("numChanges")) > self.inputs.threshold
}
if interfaces:
failures["topologies"][topology] = interfaces

if failures["topologies"]:
self.result.is_failure(f"The following STP topologies are not configured or number of changes not within the threshold:\n{failures}")
else:
self.result.is_success()
3 changes: 0 additions & 3 deletions anta/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ class VerifyReloadCause(AntaTest):
def test(self) -> None:
"""Main test function for VerifyReloadCause."""
command_output = self.instance_commands[0].json_output
if "resetCauses" not in command_output:
self.result.is_error(message="No reload causes available")
return
if len(command_output["resetCauses"]) == 0:
# No reload causes
self.result.is_success()
Expand Down
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ dev = [
"pytest-asyncio>=0.21.1",
"pytest-cov>=4.1.0",
"pytest-dependency",
"pytest-codspeed>=2.2.0",
"respx",
"pytest-html>=3.2.0",
"pytest-httpx>=0.30.0",
"pytest-metadata>=3.0.0",
Expand Down Expand Up @@ -171,6 +173,7 @@ render_collapsed = true
testpaths = ["tests"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
norecursedirs = ["tests/benchmark"] # Do not run performance testing outside of Codspeed
filterwarnings = [
# cvprac is raising the next warning
"default:pkg_resources is deprecated:DeprecationWarning",
Expand Down Expand Up @@ -450,13 +453,17 @@ disable = [ # Any rule listed here can be disabled: https://github.com/astral-sh
"keyword-arg-before-vararg",
"protected-access",
"too-many-arguments",
"too-many-positional-arguments", # New in pylint 3.3.0
"too-many-positional-arguments",
"wrong-import-position",
"pointless-statement",
"broad-exception-caught",
"line-too-long",
"unused-variable",
"redefined-builtin",
"global-statement",
"reimported",
"wrong-import-order",
"wrong-import-position",
"abstract-class-instantiated", # Overlap with https://mypy.readthedocs.io/en/stable/error_code_list.html#check-instantiation-of-abstract-classes-abstract
"unexpected-keyword-arg", # Overlap with https://mypy.readthedocs.io/en/stable/error_code_list.html#check-arguments-in-calls-call-arg and other rules
"no-value-for-parameter" # Overlap with https://mypy.readthedocs.io/en/stable/error_code_list.html#check-arguments-in-calls-call-arg
Expand Down
4 changes: 4 additions & 0 deletions tests/benchmark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Benchmark tests for ANTA."""
Loading

0 comments on commit 7fa9d7f

Please sign in to comment.