diff --git a/anta/device.py b/anta/device.py index ddfb3013a..a0e6fc45e 100644 --- a/anta/device.py +++ b/anta/device.py @@ -80,7 +80,7 @@ def cache_statistics(self) -> dict[str, Any] | None: # Need to ignore pylint no-member as Cache is a proxy class and pylint is not smart enough # https://github.com/pylint-dev/pylint/issues/7258 if self.cache is not None: - stats = self.cache.hit_miss_ratio # pylint: disable=no-member + stats = getattr(self.cache, "hit_miss_ratio", {"total": 0, "hits": 0, "hit_ratio": 0}) return {"total_commands_sent": stats["total"], "cache_hits": stats["hits"], "cache_hit_ratio": f"{stats['hit_ratio'] * 100:.2f}%"} return None diff --git a/anta/models.py b/anta/models.py index 8265b85c5..13823496a 100644 --- a/anta/models.py +++ b/anta/models.py @@ -8,6 +8,7 @@ import hashlib import logging +import re import time from abc import ABC, abstractmethod from copy import deepcopy @@ -33,6 +34,9 @@ # N = TypeVar("N", bound="AntaTest.Input") +# TODO - make this configurable - with an env var maybe? +BLACKLIST_REGEX = [r"^reload.*", r"^conf\w*\s*(terminal|session)*", r"^wr\w*\s*\w+"] + logger = logging.getLogger(__name__) @@ -423,12 +427,25 @@ def render(self, template: AntaTemplate) -> list[AntaCommand]: no AntaTemplate for this test.""" raise NotImplementedError(f"AntaTemplate are provided but render() method has not been implemented for {self.__module__}.{self.name}") + @property + def blocked(self) -> bool: + """Check if CLI commands contain a blocked keyword.""" + state = False + for command in self.instance_commands: + for pattern in BLACKLIST_REGEX: + if re.match(pattern, command.command): + self.logger.error(f"Command <{command.command}> is blocked for security reason matching {BLACKLIST_REGEX}") + self.result.is_error(f"<{command.command}> is blocked for security reason") + state = True + return state + async def collect(self) -> None: """ Method used to collect outputs of all commands of this test class from the device of this test instance. """ try: - await self.device.collect_commands(self.instance_commands) + if self.blocked is False: + await self.device.collect_commands(self.instance_commands) except Exception as e: # pylint: disable=broad-exception-caught message = f"Exception raised while collecting commands for test {self.name} (on device {self.device.name})" anta_log_exception(e, message, self.logger) diff --git a/anta/runner.py b/anta/runner.py index 023090251..cf725f29e 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -27,8 +27,8 @@ def filter_tags(tags_cli: Union[List[str], None], tags_device: List[str], tags_t async def main( manager: ResultManager, inventory: AntaInventory, - tests: list[tuple[AntaTest, AntaTest.Input]], - tags: list[str], + tests: list[tuple[type[AntaTest], AntaTest.Input]], + tags: Optional[list[str]] = None, established_only: bool = True, ) -> None: """ @@ -46,11 +46,27 @@ async def main( any: ResultManager object gets updated with the test results. """ + if not tests: + logger.info("The list of tests is empty, exiting") + return + + if len(inventory) == 0: + logger.info("The inventory is empty, exiting") + return + await inventory.connect_inventory() # asyncio.gather takes an iterator of the function to run concurrently. # we get the cross product of the devices and tests to build that iterator. devices = inventory.get_inventory(established_only=established_only, tags=tags).values() + + if len(devices) == 0: + logger.info( + f"No device in the established state '{established_only}' " + f"{f'matching the tags {tags} ' if tags else ''}was found. There is no device to run tests against, exiting" + ) + return + coros = [] for device, test in itertools.product(devices, tests): diff --git a/docs/api/models.md b/docs/api/models.md index 662eed657..c0f449486 100644 --- a/docs/api/models.md +++ b/docs/api/models.md @@ -21,6 +21,13 @@ ![](../imgs/uml/anta.models.AntaCommand.jpeg) ### ::: anta.models.AntaCommand +!!! warning + CLI commands are protected to avoid execution of critical commands such as `reload` or `write erase`. + + - Reload command: `^reload\s*\w*` + - Configure mode: `^conf\w*\s*(terminal|session)*` + - Write: `^wr\w*\s*\w+` + # Template definition ## UML Diagram diff --git a/docs/contribution.md b/docs/contribution.md index b6f93b77d..02f4e5cf7 100644 --- a/docs/contribution.md +++ b/docs/contribution.md @@ -26,7 +26,7 @@ $ pip install -e .[dev] $ pip list -e Package Version Editable project location ------- ------- ------------------------- -anta 0.9.0 /mnt/lab/projects/anta +anta 0.10.0 /mnt/lab/projects/anta ``` Then, [`tox`](https://tox.wiki/) is configued with few environments to run CI locally: diff --git a/docs/requirements-and-installation.md b/docs/requirements-and-installation.md index 9367f948c..fc04813ae 100644 --- a/docs/requirements-and-installation.md +++ b/docs/requirements-and-installation.md @@ -61,7 +61,7 @@ which anta ```bash # Check ANTA version anta --version -anta, version v0.9.0 +anta, version v0.10.0 ``` ## EOS Requirements diff --git a/pyproject.toml b/pyproject.toml index 8db223211..d46ccf4c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "anta" -version = "v0.9.0" +version = "v0.10.0" readme = "docs/README.md" authors = [{ name = "Khelil Sator", email = "ksator@arista.com" }] maintainers = [ @@ -52,7 +52,7 @@ requires-python = ">=3.8" [project.optional-dependencies] dev = [ - "bumpver==2023.1126", + "bumpver==2023.1129", "black==23.9.1", "flake8==6.1.0", "isort==5.12.0", @@ -108,7 +108,7 @@ namespaces = false # Version ################################ [tool.bumpver] -current_version = "0.9.0" +current_version = "0.10.0" version_pattern = "MAJOR.MINOR.PATCH" commit_message = "bump: Version {old_version} -> {new_version}" commit = true diff --git a/tests/units/test_device.py b/tests/units/test_device.py index c3101c26d..4f99ecf09 100644 --- a/tests/units/test_device.py +++ b/tests/units/test_device.py @@ -186,6 +186,8 @@ class TestAntaDevice: to be able to instantiate the Abstract Class """ + # pylint: disable=abstract-class-instantiated + @patch("anta.device.AntaDevice.__abstractmethods__", set()) @pytest.mark.asyncio @pytest.mark.parametrize("device_data", COLLECT_ANTADEVICE_DATA, ids=generate_test_ids_list(COLLECT_ANTADEVICE_DATA)) @@ -193,7 +195,6 @@ async def test_collect(self, device_data: dict[str, Any]) -> None: """ Test AntaDevice.collect behavior """ - # pylint: disable=abstract-class-instantiated command = AntaCommand(command=device_data["command"]["command"], use_cache=device_data["command"]["use_cache"]) device = AntaDevice(name=device_data["device"]["name"], disable_cache=device_data["device"].get("disable_cache")) # type: ignore[abstract] @@ -231,3 +232,16 @@ def _patched__collect(command: AntaCommand) -> None: else: # device is disabled assert device.cache is None patched__collect.assert_called_once_with(command=command) + + @patch("anta.device.AntaDevice.__abstractmethods__", set()) + @pytest.mark.asyncio + async def test_cache_statistics(self) -> None: + """ + Verify that when cache statistics attribute does not exist + TODO add a test where cache has some value + """ + device = AntaDevice(name="with_cache", disable_cache=False) # type: ignore[abstract] + assert device.cache_statistics == {"total_commands_sent": 0, "cache_hits": 0, "cache_hit_ratio": "0.00%"} + + device = AntaDevice(name="without_cache", disable_cache=True) # type: ignore[abstract] + assert device.cache_statistics is None diff --git a/tests/units/test_models.py b/tests/units/test_models.py index 96af6e7c2..9837aaf29 100644 --- a/tests/units/test_models.py +++ b/tests/units/test_models.py @@ -266,3 +266,29 @@ def test_test(self, mocked_device: MagicMock, data: dict[str, Any]) -> None: # Test that the test() code works as expected if "message" in data["expected"]["test"]: assert data["expected"]["test"]["message"] in test.result.messages + + +ANTATEST_BLACKLIST_DATA = ["reload", "reload --force", "write", "wr mem"] + + +@pytest.mark.parametrize("data", ANTATEST_BLACKLIST_DATA) +def test_blacklist(mocked_device: MagicMock, data: str) -> None: + """Test for blacklisting function.""" + + class FakeTestWithBlacklist(AntaTest): + """Fake Test for blacklist""" + + name = "FakeTestWithBlacklist" + description = "ANTA test that has blacklisted command" + categories = [] + commands = [AntaCommand(command=data)] + + @AntaTest.anta_test + def test(self) -> None: + self.result.is_success() + + test_instance = FakeTestWithBlacklist(mocked_device, inputs=None) + + # Run the test() method + asyncio.run(test_instance.test()) + assert test_instance.result.result == "error" diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py new file mode 100644 index 000000000..b9b598de3 --- /dev/null +++ b/tests/units/test_runner.py @@ -0,0 +1,77 @@ +# Copyright (c) 2023 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +""" +test anta.runner.py +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from anta.inventory import AntaInventory +from anta.models import AntaTest +from anta.result_manager import ResultManager +from anta.runner import main + +if TYPE_CHECKING: + from pytest import LogCaptureFixture + + +@pytest.mark.asyncio +async def test_runner_empty_tests(caplog: LogCaptureFixture, test_inventory: AntaInventory) -> None: + """ + Test that when the list of tests is empty, a log is raised + + caplog is the pytest fixture to capture logs + test_inventory is a fixture that gives a default inventory for tests + """ + manager = ResultManager() + await main(manager, test_inventory, []) + + assert len(caplog.record_tuples) == 1 + assert "The list of tests is empty, exiting" in caplog.records[0].message + + +@pytest.mark.asyncio +async def test_runner_empty_inventory(caplog: LogCaptureFixture) -> None: + """ + Test that when the Inventory is empty, a log is raised + + caplog is the pytest fixture to capture logs + """ + manager = ResultManager() + inventory = AntaInventory() + # This is not vaidated in this test + tests: list[tuple[type[AntaTest], AntaTest.Input]] = [(AntaTest, {})] # type: ignore[type-abstract] + await main(manager, inventory, tests) + + assert len(caplog.record_tuples) == 1 + assert "The inventory is empty, exiting" in caplog.records[0].message + + +@pytest.mark.asyncio +async def test_runner_no_selected_device(caplog: LogCaptureFixture, test_inventory: AntaInventory) -> None: + """ + Test that when the list of established device + + caplog is the pytest fixture to capture logs + test_inventory is a fixture that gives a default inventory for tests + """ + manager = ResultManager() + # This is not vaidated in this test + tests: list[tuple[type[AntaTest], AntaTest.Input]] = [(AntaTest, {})] # type: ignore[type-abstract] + + await main(manager, test_inventory, tests) + + assert "No device in the established state 'True' was found. There is no device to run tests against, exiting" in [record.message for record in caplog.records] + + # Reset logs and run with tags + caplog.clear() + await main(manager, test_inventory, tests, tags=["toto"]) + + assert "No device in the established state 'True' matching the tags ['toto'] was found. There is no device to run tests against, exiting" in [ + record.message for record in caplog.records + ] diff --git a/tests/units/tools/test_misc.py b/tests/units/tools/test_misc.py index 3e53e651d..8c880b620 100644 --- a/tests/units/tools/test_misc.py +++ b/tests/units/tools/test_misc.py @@ -60,7 +60,6 @@ def test_anta_log_exception( caplog.set_level(logging.ERROR, logger=calling_logger.name) else: caplog.set_level(logging.ERROR) - print(caplog.__dict__) # Need to raise to trigger nice stacktrace for __DEBUG__ == True try: my_raising_function(exception)