Skip to content

Commit

Permalink
Merge branch 'main' into issue_853
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon authored Jan 20, 2025
2 parents 7886dbd + 3f76153 commit 6ceb261
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ repos:
- '<!--| ~| -->'

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.1
rev: v0.9.2
hooks:
- id: ruff
name: Run Ruff linter
Expand Down
1 change: 1 addition & 0 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,4 @@ def validate_regex(value: str) -> str:
SnmpHashingAlgorithm = Literal["MD5", "SHA", "SHA-224", "SHA-256", "SHA-384", "SHA-512"]
SnmpEncryptionAlgorithm = Literal["AES-128", "AES-192", "AES-256", "DES"]
DynamicVlanSource = Literal["dmf", "dot1x", "dynvtep", "evpn", "mlag", "mlagsync", "mvpn", "swfwd", "vccbfd"]
LogSeverityLevel = Literal["alerts", "critical", "debugging", "emergencies", "errors", "informational", "notifications", "warnings"]
4 changes: 2 additions & 2 deletions anta/input_models/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ class BgpRoute(BaseModel):
"""The IPv4 network address."""
vrf: str = "default"
"""Optional VRF for the BGP peer. Defaults to `default`."""
paths: list[BgpRoutePath] | None = None
"""A list of paths for the BGP route. Required field in the `VerifyBGPRouteOrigin` test."""
paths: list[BgpRoutePath]
"""A list of paths for the BGP route."""

def __str__(self) -> str:
"""Return a human-readable string representation of the BgpRoute for reporting.
Expand Down
6 changes: 3 additions & 3 deletions anta/input_models/routing/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ class IPv4Routes(BaseModel):

model_config = ConfigDict(extra="forbid")
prefix: IPv4Network
"""The IPV4 network to validate the route type."""
"""IPv4 prefix in CIDR notation."""
vrf: str = "default"
"""VRF context. Defaults to `default` VRF."""
route_type: IPv4RouteType
"""List of IPV4 Route type to validate the valid rout type."""
route_type: IPv4RouteType | None = None
"""Expected route type. Required field in the `VerifyIPv4RouteType` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the IPv4RouteType for reporting."""
Expand Down
2 changes: 1 addition & 1 deletion anta/reporter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
class ReportTable:
"""TableReport Generate a Table based on TestResult."""

@dataclass()
@dataclass
class Headers: # pylint: disable=too-many-instance-attributes
"""Headers for the table report."""

Expand Down
27 changes: 25 additions & 2 deletions anta/tests/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

import re
from ipaddress import IPv4Interface
from typing import Any, ClassVar
from typing import Any, ClassVar, TypeVar

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator
from pydantic_extra_types.mac_address import MacAddress

from anta import GITHUB_SUGGESTION
Expand All @@ -23,6 +23,9 @@

BPS_GBPS_CONVERSIONS = 1000000000

# Using a TypeVar for the InterfaceState model since mypy thinks it's a ClassVar and not a valid type when used in field validators
T = TypeVar("T", bound=InterfaceState)


class VerifyInterfaceUtilization(AntaTest):
"""Verifies that the utilization of interfaces is below a certain threshold.
Expand Down Expand Up @@ -226,6 +229,16 @@ class Input(AntaTest.Input):
"""List of interfaces with their expected state."""
InterfaceState: ClassVar[type[InterfaceState]] = InterfaceState

@field_validator("interfaces")
@classmethod
def validate_interfaces(cls, interfaces: list[T]) -> list[T]:
"""Validate that 'status' field is provided in each interface."""
for interface in interfaces:
if interface.status is None:
msg = f"{interface} 'status' field missing in the input"
raise ValueError(msg)
return interfaces

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyInterfacesStatus."""
Expand Down Expand Up @@ -891,6 +904,16 @@ class Input(AntaTest.Input):
"""List of interfaces with their expected state."""
InterfaceState: ClassVar[type[InterfaceState]] = InterfaceState

@field_validator("interfaces")
@classmethod
def validate_interfaces(cls, interfaces: list[T]) -> list[T]:
"""Validate that 'portchannel' field is provided in each interface."""
for interface in interfaces:
if interface.portchannel is None:
msg = f"{interface} 'portchannel' field missing in the input"
raise ValueError(msg)
return interfaces

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyLACPInterfacesStatus."""
Expand Down
101 changes: 62 additions & 39 deletions anta/tests/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
from ipaddress import IPv4Address
from typing import TYPE_CHECKING, ClassVar

from anta.models import AntaCommand, AntaTest
from anta.custom_types import LogSeverityLevel
from anta.models import AntaCommand, AntaTemplate, AntaTest

if TYPE_CHECKING:
import logging

from anta.models import AntaTemplate


def _get_logging_states(logger: logging.Logger, command_output: str) -> str:
"""Parse `show logging` output and gets operational logging states used in the tests in this module.
Expand Down Expand Up @@ -201,35 +200,43 @@ class VerifyLoggingLogsGeneration(AntaTest):
This test performs the following checks:
1. Sends a test log message at the **informational** level
2. Retrieves the most recent logs (last 30 seconds)
3. Verifies that the test message was successfully logged
!!! warning
EOS logging buffer should be set to severity level `informational` or higher for this test to work.
1. Sends a test log message at the specified severity log level.
2. Retrieves the most recent logs (last 30 seconds).
3. Verifies that the test message was successfully logged.
Expected Results
----------------
* Success: If logs are being generated and the test message is found in recent logs.
* Failure: If any of the following occur:
- The test message is not found in recent logs
- The logging system is not capturing new messages
- No logs are being generated
- The test message is not found in recent logs.
- The logging system is not capturing new messages.
- No logs are being generated.
Examples
--------
```yaml
anta.tests.logging:
- VerifyLoggingLogsGeneration:
severity_level: informational
```
"""

categories: ClassVar[list[str]] = ["logging"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [
AntaCommand(command="send log level informational message ANTA VerifyLoggingLogsGeneration validation", ofmt="text"),
AntaCommand(command="show logging informational last 30 seconds | grep ANTA", ofmt="text", use_cache=False),
AntaTemplate(template="send log level {severity_level} message ANTA VerifyLoggingLogsGeneration validation", ofmt="text"),
AntaTemplate(template="show logging {severity_level} last 30 seconds | grep ANTA", ofmt="text", use_cache=False),
]

class Input(AntaTest.Input):
"""Input model for the VerifyLoggingLogsGeneration test."""

severity_level: LogSeverityLevel = "informational"
"""Log severity level. Defaults to informational."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for log severity level in the input."""
return [template.render(severity_level=self.inputs.severity_level)]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyLoggingLogsGeneration."""
Expand All @@ -248,37 +255,45 @@ class VerifyLoggingHostname(AntaTest):
This test performs the following checks:
1. Retrieves the device's configured FQDN
2. Sends a test log message at the **informational** level
3. Retrieves the most recent logs (last 30 seconds)
4. Verifies that the test message includes the complete FQDN of the device
!!! warning
EOS logging buffer should be set to severity level `informational` or higher for this test to work.
1. Retrieves the device's configured FQDN.
2. Sends a test log message at the specified severity log level.
3. Retrieves the most recent logs (last 30 seconds).
4. Verifies that the test message includes the complete FQDN of the device.
Expected Results
----------------
* Success: If logs are generated with the device's complete FQDN.
* Failure: If any of the following occur:
- The test message is not found in recent logs
- The log message does not include the device's FQDN
- The FQDN in the log message doesn't match the configured FQDN
- The test message is not found in recent logs.
- The log message does not include the device's FQDN.
- The FQDN in the log message doesn't match the configured FQDN.
Examples
--------
```yaml
anta.tests.logging:
- VerifyLoggingHostname:
severity_level: informational
```
"""

categories: ClassVar[list[str]] = ["logging"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [
AntaCommand(command="show hostname", revision=1),
AntaCommand(command="send log level informational message ANTA VerifyLoggingHostname validation", ofmt="text"),
AntaCommand(command="show logging informational last 30 seconds | grep ANTA", ofmt="text", use_cache=False),
AntaTemplate(template="send log level {severity_level} message ANTA VerifyLoggingHostname validation", ofmt="text"),
AntaTemplate(template="show logging {severity_level} last 30 seconds | grep ANTA", ofmt="text", use_cache=False),
]

class Input(AntaTest.Input):
"""Input model for the VerifyLoggingHostname test."""

severity_level: LogSeverityLevel = "informational"
"""Log severity level. Defaults to informational."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for log severity level in the input."""
return [template.render(severity_level=self.inputs.severity_level)]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyLoggingHostname."""
Expand All @@ -303,37 +318,45 @@ class VerifyLoggingTimestamp(AntaTest):
This test performs the following checks:
1. Sends a test log message at the **informational** level
2. Retrieves the most recent logs (last 30 seconds)
3. Verifies that the test message is present with a high-resolution RFC3339 timestamp format
- Example format: `2024-01-25T15:30:45.123456+00:00`
- Includes microsecond precision
- Contains timezone offset
!!! warning
EOS logging buffer should be set to severity level `informational` or higher for this test to work.
1. Sends a test log message at the specified severity log level.
2. Retrieves the most recent logs (last 30 seconds).
3. Verifies that the test message is present with a high-resolution RFC3339 timestamp format.
- Example format: `2024-01-25T15:30:45.123456+00:00`.
- Includes microsecond precision.
- Contains timezone offset.
Expected Results
----------------
* Success: If logs are generated with the correct high-resolution RFC3339 timestamp format.
* Failure: If any of the following occur:
- The test message is not found in recent logs
- The timestamp format does not match the expected RFC3339 format
- The test message is not found in recent logs.
- The timestamp format does not match the expected RFC3339 format.
Examples
--------
```yaml
anta.tests.logging:
- VerifyLoggingTimestamp:
severity_level: informational
```
"""

categories: ClassVar[list[str]] = ["logging"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [
AntaCommand(command="send log level informational message ANTA VerifyLoggingTimestamp validation", ofmt="text"),
AntaCommand(command="show logging informational last 30 seconds | grep ANTA", ofmt="text", use_cache=False),
AntaTemplate(template="send log level {severity_level} message ANTA VerifyLoggingTimestamp validation", ofmt="text"),
AntaTemplate(template="show logging {severity_level} last 30 seconds | grep ANTA", ofmt="text", use_cache=False),
]

class Input(AntaTest.Input):
"""Input model for the VerifyLoggingTimestamp test."""

severity_level: LogSeverityLevel = "informational"
"""Log severity level. Defaults to informational."""

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for log severity level in the input."""
return [template.render(severity_level=self.inputs.severity_level)]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyLoggingTimestamp."""
Expand Down
20 changes: 16 additions & 4 deletions anta/tests/routing/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ipaddress import IPv4Address, IPv4Interface
from typing import TYPE_CHECKING, ClassVar, Literal

from pydantic import model_validator
from pydantic import field_validator, model_validator

from anta.custom_types import PositiveInteger
from anta.input_models.routing.generic import IPv4Routes
Expand Down Expand Up @@ -189,9 +189,10 @@ class VerifyIPv4RouteType(AntaTest):
"""Verifies the route-type of the IPv4 prefixes.
This test performs the following checks for each IPv4 route:
1. Verifies that the specified VRF is configured.
2. Verifies that the specified IPv4 route is exists in the configuration.
3. Verifies that the the specified IPv4 route is of the expected type.
1. Verifies that the specified VRF is configured.
2. Verifies that the specified IPv4 route is exists in the configuration.
3. Verifies that the the specified IPv4 route is of the expected type.
Expected Results
----------------
Expand Down Expand Up @@ -230,6 +231,17 @@ class Input(AntaTest.Input):
"""Input model for the VerifyIPv4RouteType test."""

routes_entries: list[IPv4Routes]
"""List of IPv4 route(s)."""

@field_validator("routes_entries")
@classmethod
def validate_routes_entries(cls, routes_entries: list[IPv4Routes]) -> list[IPv4Routes]:
"""Validate that 'route_type' field is provided in each BGP route entry."""
for entry in routes_entries:
if entry.route_type is None:
msg = f"{entry} 'route_type' field missing in the input"
raise ValueError(msg)
return routes_entries

@AntaTest.anta_test
def test(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion docs/templates/python/material/class.html.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
{{ super() }}

{% for dec in class.decorators %}
{% if dec.value.function.name == "deprecated_test_class" %}
{% if dec.value.function is defined and dec.value.function.name == "deprecated_test_class" %}
<img alt="Static Badge" src="https://img.shields.io/badge/DEPRECATED-yellow?style=flat&logoSize=auto">
{% for arg in dec.value.arguments | selectattr("name", "equalto", "removal_in_version") | list %}
<img alt="Static Badge" src="https://img.shields.io/badge/REMOVAL-{{ arg.value[1:-1] }}-grey?style=flat&logoSize=auto&labelColor=red">
Expand Down
3 changes: 3 additions & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ anta.tests.logging:
# Verifies there are no syslog messages with a severity of ERRORS or higher.
- VerifyLoggingHostname:
# Verifies if logs are generated with the device FQDN.
severity_level: informational
- VerifyLoggingHosts:
# Verifies logging hosts (syslog servers) for a specified VRF.
hosts:
Expand All @@ -299,6 +300,7 @@ anta.tests.logging:
vrf: default
- VerifyLoggingLogsGeneration:
# Verifies if logs are generated.
severity_level: informational
- VerifyLoggingPersistent:
# Verifies if logging persistent is enabled and logs are saved in flash.
- VerifyLoggingSourceIntf:
Expand All @@ -307,6 +309,7 @@ anta.tests.logging:
vrf: default
- VerifyLoggingTimestamp:
# Verifies if logs are generated with the appropriate timestamp.
severity_level: informational
- VerifySyslogLogging:
# Verifies if syslog logging is enabled.
anta.tests.mlag:
Expand Down
Loading

0 comments on commit 6ceb261

Please sign in to comment.