From 1f1cfcb27f0c68bdf7f89b22497d5813855210a0 Mon Sep 17 00:00:00 2001 From: Carl Montanari Date: Sat, 1 Feb 2020 14:05:46 -0800 Subject: [PATCH] adding functional tests and cleanup --- Makefile | 60 +++++++++- nssh/channel/channel.py | 52 +++++++-- nssh/driver/__init__.py | 3 +- nssh/driver/core/__init__.py | 4 + nssh/driver/core/cisco_iosxe/driver.py | 7 +- nssh/driver/driver.py | 52 ++++++++- .../{core/driver.py => network_driver.py} | 39 +++++-- nssh/helper.py | 3 - nssh/transport/cssh2.py | 1 - nssh/transport/miko.py | 2 +- nssh/transport/ptyprocess.py | 2 - nssh/transport/socket.py | 48 ++++++++ nssh/transport/systemssh.py | 55 ++++++--- nssh/transport/transport.py | 50 +++++++++ setup.cfg | 5 + tests/functional/conftest.py | 41 +++++++ tests/functional/driver/__init__.py | 0 tests/functional/driver/iosxe_helper.py | 46 ++++++++ tests/functional/driver/test_driver.py | 59 ++++++++++ .../test_data/devices/cisco_iosxe.json | 14 +++ .../test_data/test_cases/cisco_iosxe.json | 77 +++++++++++++ tests/unit/channel/test_channel.py | 19 +++- tests/unit/conftest.py | 3 +- .../driver/core/cisco_iosxe/test_driver.py | 6 + tests/unit/driver/test_driver.py | 6 + .../test_driver.py => test_network_driver.py} | 104 +++++++++++++++--- tests/unit/transport/test_socket.py | 15 +++ tox.ini | 3 +- 28 files changed, 708 insertions(+), 68 deletions(-) rename nssh/driver/{core/driver.py => network_driver.py} (88%) create mode 100644 tests/functional/conftest.py create mode 100644 tests/functional/driver/__init__.py create mode 100644 tests/functional/driver/iosxe_helper.py create mode 100644 tests/functional/driver/test_driver.py create mode 100644 tests/functional/test_data/devices/cisco_iosxe.json create mode 100644 tests/functional/test_data/test_cases/cisco_iosxe.json rename tests/unit/driver/{core/test_driver.py => test_network_driver.py} (71%) diff --git a/Makefile b/Makefile index 8c99621b..34c20f5a 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,69 @@ lint: - python -m isort -rc -w 100 -y . + python -m isort -rc -y . python -m black . python -m pylama . python -m pydocstyle . find nssh -type f \( -iname "*.py" ! -iname "ptyprocess.py" \) | xargs darglint +cov: + python -m pytest \ + --cov=nssh \ + --cov-report html \ + --cov-report term \ + tests/ + cov_unit: python -m pytest \ --cov=nssh \ --cov-report html \ --cov-report term \ - tests/unit/ \ No newline at end of file + tests/unit/ + +cov_functional: + python -m pytest \ + --cov=nssh \ + --cov-report html \ + --cov-report term \ + tests/functional/ + +test: + python -m pytest tests/ + +test_unit: + python -m pytest tests/unit/ + +test_functional: + python -m pytest tests/functional/ + +test_iosxe: + python -m pytest -v \ + tests/unit \ + tests/functional/cisco_iosxe + +test_nxos: + python -m pytest -v \ + tests/unit \ + tests/functional/cisco_nxos + +test_iosxr: + python -m pytest -v \ + tests/unit \ + tests/functional/cisco_iosxr + +test_eos: + python -m pytest -v \ + tests/unit \ + tests/functional/arista_eos + +test_junos: + python -m pytest -v \ + tests/unit \ + tests/functional/juniper_junos + +.PHONY: docs +docs: + python -m pdoc \ + --html \ + --output-dir docs \ + nssh \ + --force \ No newline at end of file diff --git a/nssh/channel/channel.py b/nssh/channel/channel.py index f42a6ead..085e2abb 100644 --- a/nssh/channel/channel.py +++ b/nssh/channel/channel.py @@ -54,6 +54,40 @@ def __init__( self.comms_ansi = comms_ansi self.timeout_ops = timeout_ops + def __str__(self) -> str: + """ + Magic str method for Channel + + Args: + N/A # noqa + + Returns: + N/A # noqa + + Raises: + N/A # noqa + + """ + return "nssh Channel Object" + + def __repr__(self) -> str: + """ + Magic repr method for Channel + + Args: + N/A # noqa + + Returns: + repr: repr for class object + + Raises: + N/A # noqa + + """ + class_dict = self.__dict__.copy() + class_dict.pop("transport") + return f"nssh Channel {class_dict}" + def _restructure_output(self, output: bytes, strip_prompt: bool = False) -> bytes: """ Clean up preceding empty lines, and strip prompt if desired @@ -70,8 +104,12 @@ def _restructure_output(self, output: bytes, strip_prompt: bool = False) -> byte """ output = normalize_lines(output) - # purge empty rows before actual output - output = b"\n".join([row for row in output.splitlines() if row]) + + # TODO -- purge empty rows before actual output + # this was used to remove duplicate line feeds in output, but that causes some issues for + # testing where we want to match the normal output we see as users... so i think this + # should be removed -- or optional? + # output = b"\n".join([row for row in output.splitlines() if row]) if not strip_prompt: return output @@ -116,7 +154,6 @@ def _read_until_input(self, channel_input: bytes) -> bytes: """ output = b"" - # TODO -- make sure the appending works same as += (who knows w/ bytes!) while channel_input not in output: output += self._read_chunk() return output @@ -148,7 +185,7 @@ def _read_until_prompt(self, output: bytes = b"", prompt: str = "") -> bytes: # parsing if a prompt-like thing is at the end of the output # TODO -- at one point this was bytes -> str w/ `unicode-escape` have not tested # on many live devices if keeping this all bytes works!!! - output = re.sub(b"\r", b"\n", output.strip()) + output = re.sub(b"\r", b"", output.strip()) channel_match = re.search(prompt_pattern, output) if channel_match: self.transport.set_blocking(True) @@ -209,12 +246,11 @@ def send_inputs( result = Result(self.transport.host, channel_input) raw_result, processed_result = self._send_input(channel_input, strip_prompt) result.raw_result = raw_result.decode() - result.record_result(processed_result.decode()) + result.record_result(processed_result.decode().strip()) results.append(result) return results - # TODO - uncomment! - #@operation_timeout("timeout_ops") + @operation_timeout("timeout_ops") def _send_input(self, channel_input: str, strip_prompt: bool) -> Tuple[bytes, bytes]: """ Send input to device and return results @@ -280,7 +316,7 @@ def send_inputs_interact( channel_input, expectation, response, finale, hidden_response ) result.raw_result = raw_result.decode() - result.record_result(processed_result.decode()) + result.record_result(processed_result.decode().strip()) results.append(result) return results diff --git a/nssh/driver/__init__.py b/nssh/driver/__init__.py index 133a910f..38ba5eb7 100644 --- a/nssh/driver/__init__.py +++ b/nssh/driver/__init__.py @@ -1,4 +1,5 @@ """nssh.driver""" from nssh.driver.driver import NSSH +from nssh.driver.network_driver import NetworkDriver -__all__ = ("NSSH",) +__all__ = ("NSSH", "NetworkDriver") diff --git a/nssh/driver/core/__init__.py b/nssh/driver/core/__init__.py index e69de29b..28f72c24 100644 --- a/nssh/driver/core/__init__.py +++ b/nssh/driver/core/__init__.py @@ -0,0 +1,4 @@ +"""nssh.driver.core""" +from nssh.driver.core.cisco_iosxe.driver import IOSXEDriver + +__all__ = ("IOSXEDriver",) diff --git a/nssh/driver/core/cisco_iosxe/driver.py b/nssh/driver/core/cisco_iosxe/driver.py index 40a40b70..a98bd33c 100644 --- a/nssh/driver/core/cisco_iosxe/driver.py +++ b/nssh/driver/core/cisco_iosxe/driver.py @@ -1,7 +1,8 @@ """nssh.driver.core.cisco_iosxe.driver""" from typing import Any, Dict -from nssh.driver.core.driver import NetworkDriver, PrivilegeLevel +from nssh.driver import NetworkDriver +from nssh.driver.network_driver import PrivilegeLevel PRIVS = { "exec": ( @@ -36,7 +37,7 @@ PrivilegeLevel( r"^[a-z0-9.\-@/:]{1,32}\(config\)#$", "configuration", - "priv", + "privilege_exec", "end", None, None, @@ -50,7 +51,7 @@ PrivilegeLevel( r"^[a-z0-9.\-@/:]{1,32}\(config[a-z0-9.\-@/:]{1,16}\)#$", "special_configuration", - "priv", + "privilege_exec", "end", None, None, diff --git a/nssh/driver/driver.py b/nssh/driver/driver.py index 8b8e806c..5f83d23d 100644 --- a/nssh/driver/driver.py +++ b/nssh/driver/driver.py @@ -2,7 +2,8 @@ import logging import os import re -from typing import Any, Callable, Dict, Tuple, Union +from types import TracebackType +from typing import Any, Callable, Dict, Optional, Tuple, Type, Union from nssh.channel import CHANNEL_ARGS, Channel from nssh.helper import get_external_function, validate_external_function @@ -27,7 +28,6 @@ "paramiko": MIKO_TRANSPORT_ARGS, } - LOG = logging.getLogger("nssh_base") @@ -39,6 +39,7 @@ def __init__( auth_username: str = "", auth_password: str = "", auth_public_key: str = "", + auth_strict_key: bool = True, timeout_socket: int = 5, timeout_ssh: int = 5000, timeout_ops: int = 10, @@ -60,6 +61,7 @@ def __init__( N/A # noqa """ + # TODO -- docstring self.host = host.strip() if not isinstance(port, int): raise TypeError(f"port should be int, got {type(port)}") @@ -67,7 +69,9 @@ def __init__( self.auth_username: str = "" self.auth_password: str = "" - self.auth_public_key: bytes = b"" + if not isinstance(auth_strict_key, bool): + raise TypeError(f"auth_strict_key should be bool, got {type(auth_strict_key)}") + self.auth_strict_key = auth_strict_key self._setup_auth(auth_username, auth_password, auth_public_key) self.timeout_socket = int(timeout_socket) @@ -189,7 +193,7 @@ def _setup_session( except TypeError: self.session_disable_paging = session_disable_paging else: - self.session_disable_paging = session_disable_paging + self.session_disable_paging = "terminal length 0" @staticmethod def _set_session_pre_login_handler( @@ -301,3 +305,43 @@ def close(self) -> None: """ self.transport.close() + + def __enter__(self) -> "NSSH": + """ + Enter method for context manager + + Args: + N/A # noqa + + Returns: + self: instance of self + + Raises: + N/A # noqa + + """ + self.open() + return self + + def __exit__( + self, + exception_type: Optional[Type[BaseException]], + exception_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """ + Exit method to cleanup for context manager + + Args: + exception_type: exception type being raised + exception_value: message from exception being raised + traceback: traceback from exception being raised + + Returns: + N/A # noqa + + Raises: + N/A # noqa + + """ + self.close() diff --git a/nssh/driver/core/driver.py b/nssh/driver/network_driver.py similarity index 88% rename from nssh/driver/core/driver.py rename to nssh/driver/network_driver.py index a603f11c..08008502 100644 --- a/nssh/driver/core/driver.py +++ b/nssh/driver/network_driver.py @@ -1,13 +1,34 @@ -"""nssh.driver.core.driver""" +"""nssh.base""" import collections +import logging import re from io import TextIOWrapper -from typing import Any, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from nssh import NSSH +from nssh.driver.driver import NSSH from nssh.exceptions import CouldNotAcquirePrivLevel, UnknownPrivLevel from nssh.helper import _textfsm_get_template, get_prompt_pattern, textfsm_parse from nssh.result import Result +from nssh.transport import ( + MIKO_TRANSPORT_ARGS, + SSH2_TRANSPORT_ARGS, + SYSTEM_SSH_TRANSPORT_ARGS, + MikoTransport, + SSH2Transport, + SystemSSHTransport, + Transport, +) + +TRANSPORT_CLASS: Dict[str, Callable[..., Transport]] = { + "system": SystemSSHTransport, + "ssh2": SSH2Transport, + "paramiko": MikoTransport, +} +TRANSPORT_ARGS: Dict[str, Tuple[str, ...]] = { + "system": SYSTEM_SSH_TRANSPORT_ARGS, + "ssh2": SSH2_TRANSPORT_ARGS, + "paramiko": MIKO_TRANSPORT_ARGS, +} PrivilegeLevel = collections.namedtuple( "PrivilegeLevel", @@ -25,6 +46,8 @@ PRIVS: Dict[str, PrivilegeLevel] = {} +LOG = logging.getLogger("nssh_base") + class NetworkDriver(NSSH): def __init__(self, auth_secondary: str = "", **kwargs: Any): @@ -55,14 +78,12 @@ def _determine_current_priv(self, current_prompt: str) -> PrivilegeLevel: current_prompt: string of current prompt Returns: - priv_level: NamedTuple of current privilege level + PrivilegeLevel: NamedTuple of current privilege level Raises: - UnknownPrivLevel: if privilege level cannot be determined # noqa - # NOTE: darglint raises DAR401 for some reason hence the noqa... + UnknownPrivLevel: if privilege level cannot be determined """ - # TODO -- fix above note... for priv_level in self.privs.values(): prompt_pattern = get_prompt_pattern("", priv_level.pattern) if re.search(prompt_pattern, current_prompt.encode()): @@ -126,6 +147,10 @@ def _deescalate(self) -> None: current_priv = self._determine_current_priv(self.channel.get_prompt()) if current_priv.deescalate: next_priv = self.privs.get(current_priv.deescalate_priv, None) + if not next_priv: + raise UnknownPrivLevel( + "NetworkDriver has no default priv levels, set them or use a network driver" + ) self.channel.comms_prompt_pattern = next_priv.pattern self.channel.send_inputs(current_priv.deescalate) diff --git a/nssh/helper.py b/nssh/helper.py index 23747634..a26fe97f 100644 --- a/nssh/helper.py +++ b/nssh/helper.py @@ -138,9 +138,6 @@ def _textfsm_get_template(platform: str, command: str) -> Optional[TextIO]: """ try: from textfsm.clitable import CliTable # pylint: disable=C0415 - - # TODO -- dont think we *need* ntc_templates since we can pass string path to template - import ntc_templates # pylint: disable=C0415,W0611 except ModuleNotFoundError as exc: err = f"Module '{exc.name}' not installed!" msg = f"***** {err} {'*' * (80 - len(err))}" diff --git a/nssh/transport/cssh2.py b/nssh/transport/cssh2.py index 69774843..3453513c 100644 --- a/nssh/transport/cssh2.py +++ b/nssh/transport/cssh2.py @@ -335,7 +335,6 @@ def read(self) -> bytes: """ output: bytes - # TODO should this always be 65535 or empty or...? _, output = self.channel.read(65535) return output diff --git a/nssh/transport/miko.py b/nssh/transport/miko.py index 386a5bbc..170067a1 100644 --- a/nssh/transport/miko.py +++ b/nssh/transport/miko.py @@ -302,7 +302,7 @@ def read(self) -> bytes: N/A # noqa """ - channel_read: bytes = self.channel.recv(1024) + channel_read: bytes = self.channel.recv(65535) return channel_read def write(self, channel_input: str) -> None: diff --git a/nssh/transport/ptyprocess.py b/nssh/transport/ptyprocess.py index c3e94963..ac6c783d 100644 --- a/nssh/transport/ptyprocess.py +++ b/nssh/transport/ptyprocess.py @@ -31,8 +31,6 @@ import sys import termios import time - -# Constants from pty import CHILD, STDIN_FILENO from shutil import which from typing import Type, TypeVar diff --git a/nssh/transport/socket.py b/nssh/transport/socket.py index 01263851..ec2fdef4 100644 --- a/nssh/transport/socket.py +++ b/nssh/transport/socket.py @@ -15,6 +15,54 @@ def __init__(self, host: str, port: int, timeout: int): self.timeout: int = timeout self.sock: Optional[socket.socket] = None + def __bool__(self) -> bool: + """ + Magic bool method for Socket + + Args: + N/A # noqa + + Returns: + bool: True/False if socket is alive or not + + Raises: + N/A # noqa + + """ + return self.socket_isalive() + + def __str__(self) -> str: + """ + Magic str method for Socket + + Args: + N/A # noqa + + Returns: + N/A # noqa + + Raises: + N/A # noqa + + """ + return f"Socket Object for host {self.host}" + + def __repr__(self) -> str: + """ + Magic repr method for Socket + + Args: + N/A # noqa + + Returns: + repr: repr for class object + + Raises: + N/A # noqa + + """ + return f"Socket {self.__dict__}" + def socket_open(self) -> None: """ Open underlying socket diff --git a/nssh/transport/systemssh.py b/nssh/transport/systemssh.py index 4162e45a..7b04757d 100644 --- a/nssh/transport/systemssh.py +++ b/nssh/transport/systemssh.py @@ -4,7 +4,7 @@ from select import select from subprocess import PIPE, Popen from threading import Lock -from typing import TYPE_CHECKING, List, Optional, Union +from typing import TYPE_CHECKING, Optional, Union from nssh.decorators import operation_timeout from nssh.exceptions import NSSHAuthenticationFailed @@ -26,6 +26,7 @@ "auth_username", "auth_public_key", "auth_password", + "auth_strict_key", "comms_return_char", ) @@ -39,6 +40,7 @@ def __init__( auth_username: str = "", auth_public_key: str = "", auth_password: str = "", + auth_strict_key: bool = True, comms_prompt_pattern: str = r"^[a-z0-9.\-@()/:]{1,32}[#>$]$", comms_return_char: str = "\n", ): # pylint: disable=W0231 @@ -55,6 +57,7 @@ def __init__( auth_username: username for authentication auth_public_key: path to public key for authentication auth_password: password for authentication + auth_strict_key: True/False to enforce strict key checking (default is True) comms_prompt_pattern: prompt pattern expected for device, same as the one provided to channel -- system ssh needs to know this to know how to decide if we are properly sending/receiving data -- i.e. we are not stuck at some password prompt or some @@ -80,6 +83,7 @@ def __init__( self.auth_username: str = auth_username self.auth_public_key: str = auth_public_key self.auth_password: str = auth_password + self.auth_strict_key: bool = auth_strict_key self.comms_prompt_pattern: str = comms_prompt_pattern self.comms_return_char: str = comms_return_char @@ -87,6 +91,32 @@ def __init__( self.lib_auth_exception = NSSHAuthenticationFailed self._isauthenticated = False + self.open_cmd = ["ssh", self.host] + self._build_open_cmd() + + def _build_open_cmd(self) -> None: + """ + Method to craft command to open ssh session + + Args: + N/A # noqa + + Returns: + N/A # noqa + + Raises: + N/A # noqa + + """ + # TODO -- need to handle ssh config, proxy, other cli args... + self.open_cmd.extend(["-p", str(self.port)]) + if self.auth_public_key: + self.open_cmd.extend(["-i", self.auth_public_key]) + if self.auth_username: + self.open_cmd.extend(["-l", self.auth_username]) + if self.auth_strict_key is False: + self.open_cmd.extend(["-o", "StrictHostKeyChecking=no"]) + def open(self) -> None: """ Parent method to open session, authenticate and acquire shell @@ -102,29 +132,25 @@ def open(self) -> None: """ self.session_lock.acquire() - # TODO -- construct open command -- this means parsing ssh keys and such prior to getting - # here in case users dont want to just rely on their ssh config file - open_cmd = ["ssh", self.host, "-l", self.auth_username] # If authenticating with public key prefer to use open pipes # _open_pipes uses subprocess Popen which is preferable to opening a pty if self.auth_public_key: - open_cmd.extend(["-i", self.auth_public_key]) - if self._open_pipes(open_cmd): + if self._open_pipes(): return # If public key auth fails or is not configured, open a pty session - if not self._open_pty(open_cmd): + if not self._open_pty(): msg = f"Authentication to host {self.host} failed" LOG.critical(msg) raise NSSHAuthenticationFailed(msg) - def _open_pipes(self, open_cmd: List[str]) -> bool: + def _open_pipes(self) -> bool: """ Private method to open session with subprocess.Popen Args: - open_cmd: ssh command string to use to open connection + N/A # noqa Returns: bool: True/False session was opened and authenticated @@ -133,9 +159,9 @@ def _open_pipes(self, open_cmd: List[str]) -> bool: N/A # noqa """ - open_cmd.append("-v") + self.open_cmd.append("-v") pipes_session = Popen( - open_cmd, bufsize=0, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE + self.open_cmd, bufsize=0, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE ) LOG.debug(f"Session to host {self.host} spawned") @@ -171,12 +197,12 @@ def _pipes_isauthenticated(self, pipes_session: PopenBytes) -> bool: self._isauthenticated = True return True - def _open_pty(self, open_cmd: List[str]) -> bool: + def _open_pty(self) -> bool: """ Private method to open session with PtyProcess Args: - open_cmd: ssh command string to use to open connection + N/A # noqa Returns: bool: True/False session was opened and authenticated @@ -185,7 +211,7 @@ def _open_pty(self, open_cmd: List[str]) -> bool: N/A # noqa """ - pty_session = PtyProcess.spawn(open_cmd) + pty_session = PtyProcess.spawn(self.open_cmd) LOG.debug(f"Session to host {self.host} spawned") self.session_lock.release() self._pty_authenticate(pty_session) @@ -329,7 +355,6 @@ def read(self) -> bytes: N/A # noqa """ - # TODO what value should read be...? read_bytes = 65535 if isinstance(self.session, Popen): return self.session.stdout.read(read_bytes) diff --git a/nssh/transport/transport.py b/nssh/transport/transport.py index ee7f07cf..384fc4c5 100644 --- a/nssh/transport/transport.py +++ b/nssh/transport/transport.py @@ -10,6 +10,56 @@ def __init__(self, *args: Union[str, int], **kwargs: Dict[str, Union[str, int]]) self.host: str = "" self.session_lock: Lock = Lock() + def __bool__(self) -> bool: + """ + Magic bool method for Socket + + Args: + N/A # noqa + + Returns: + bool: True/False if socket is alive or not + + Raises: + N/A # noqa + + """ + return self.isalive() + + def __str__(self) -> str: + """ + Magic str method for Transport + + Args: + N/A # noqa + + Returns: + N/A # noqa + + Raises: + N/A # noqa + + """ + return f"Transport Object for host {self.host}" + + def __repr__(self) -> str: + """ + Magic repr method for Transport + + Args: + N/A # noqa + + Returns: + repr: repr for class object + + Raises: + N/A # noqa + + """ + class_dict = self.__dict__.copy() + class_dict["auth_password"] = "********" + return f"Transport {class_dict}" + @abstractmethod def open(self) -> None: """ diff --git a/setup.cfg b/setup.cfg index 2117c569..27908736 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,6 +15,11 @@ rcfile = .pylintrc ignore = D101,D202,D203,D212,D400,D406,D407,D408,D409,D415 match-dir = ^nssh/* +[isort] +line_length = 100 +multi_line_output = 3 +include_trailing_comma = True + [darglint] docstring_style = google strictness = full diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py new file mode 100644 index 00000000..0e1ff91d --- /dev/null +++ b/tests/functional/conftest.py @@ -0,0 +1,41 @@ +import pytest + +from nssh import NSSH + + +@pytest.fixture(scope="module") +def base_driver(): + def _base_driver( + host, + port=22, + auth_username="", + auth_password="", + auth_public_key="", + timeout_socket=1, + timeout_ssh=1000, + timeout_ops=2, + comms_prompt_pattern=r"^[a-z0-9.\-@()/:]{1,32}[#>$]$", + comms_ansi=False, + session_pre_login_handler="", + session_disable_paging="terminal length 0", + driver="system", + ): + conn = NSSH( + host=host, + port=port, + auth_username=auth_username, + auth_password=auth_password, + auth_public_key=auth_public_key, + timeout_socket=timeout_socket, + timeout_ssh=timeout_ssh, + timeout_ops=timeout_ops, + comms_prompt_pattern=comms_prompt_pattern, + comms_ansi=comms_ansi, + session_pre_login_handler=session_pre_login_handler, + session_disable_paging=session_disable_paging, + driver=driver, + ) + conn.open() + return conn + + return _base_driver diff --git a/tests/functional/driver/__init__.py b/tests/functional/driver/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/functional/driver/iosxe_helper.py b/tests/functional/driver/iosxe_helper.py new file mode 100644 index 00000000..d2de3a2d --- /dev/null +++ b/tests/functional/driver/iosxe_helper.py @@ -0,0 +1,46 @@ +import re + + +def _replace_config_bytes(output): + config_bytes_pattern = re.compile(r"^Current configuration : \d+ bytes$", flags=re.M | re.I) + output = re.sub(config_bytes_pattern, "Current configuration : CONFIG_BYTES", output) + return output + + +def _replace_timestamps(output): + datetime_pattern = re.compile( + r"\d+:\d+:\d+\d+\s+[a-z]{3}\s+(mon|tue|wed|thu|fri|sat|sun)\s+(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\s+\d+\s+\d+", + flags=re.M | re.I, + ) + output = re.sub(datetime_pattern, "TIME_STAMP_REPLACED", output) + return output + + +def _replace_configured_by(output): + configured_by_pattern = re.compile( + r"^! Last configuration change at TIME_STAMP_REPLACED by (\w+)$", flags=re.M | re.I + ) + output = re.sub( + configured_by_pattern, "! Last configuration change at TIME_STAMP_REPLACED", output + ) + return output + + +def _replace_crypto_strings(output): + crypto_pattern = re.compile( + r"^\s+certificate self-signed.*$\s(^\s{2}(\w+\s){1,8})+\s+quit$", flags=re.M | re.I + ) + output = re.sub(crypto_pattern, "CRYPTO_REPLACED", output) + return output + + +def clean_output_data(test, output): + if test["replace_bytes"]: + output = _replace_config_bytes(output) + if test["replace_timestamps"]: + output = _replace_timestamps(output) + if test["replace_cfg_by"]: + output = _replace_configured_by(output) + if test["replace_crypto"]: + output = _replace_crypto_strings(output) + return output diff --git a/tests/functional/driver/test_driver.py b/tests/functional/driver/test_driver.py new file mode 100644 index 00000000..ffcb18f4 --- /dev/null +++ b/tests/functional/driver/test_driver.py @@ -0,0 +1,59 @@ +import json +from pathlib import Path + +import pytest + +import nssh + +from .iosxe_helper import clean_output_data + +TEST_DATA_PATH = f"{Path(nssh.__file__).parents[1]}/tests/functional/test_data" +with open(f"{TEST_DATA_PATH}/devices/cisco_iosxe.json", "r") as f: + CISCO_IOSXE_DEVICE = json.load(f) +with open(f"{TEST_DATA_PATH}/test_cases/cisco_iosxe.json", "r") as f: + test_cases = json.load(f) + CISCO_IOSXE_TEST_CASES = test_cases["test_cases"] + +TEST_CASES = {"cisco_iosxe": CISCO_IOSXE_TEST_CASES} + + +@pytest.mark.parametrize( + "driver", ["system", "ssh2", "paramiko"], ids=["system", "ssh2", "paramiko"] +) +def test_get_prompt(base_driver, driver): + conn = base_driver(**CISCO_IOSXE_DEVICE, driver=driver) + result = conn.channel.get_prompt() + assert result == "csr1000v#" + + +@pytest.mark.parametrize( + "test", + [t for t in CISCO_IOSXE_TEST_CASES["channel.send_inputs"]["tests"]], + ids=[n["name"] for n in CISCO_IOSXE_TEST_CASES["channel.send_inputs"]["tests"]], +) +@pytest.mark.parametrize( + "driver", ["system", "ssh2", "paramiko"], ids=["system", "ssh2", "paramiko"] +) +def test_channel_send_inputs(base_driver, driver, test): + conn = base_driver(**CISCO_IOSXE_DEVICE, driver=driver) + results = conn.channel.send_inputs(test["inputs"], **test["kwargs"]) + for index, result in enumerate(results): + cleaned_result = clean_output_data(test, result.result) + assert cleaned_result == test["outputs"][index] + conn.close() + + +@pytest.mark.parametrize( + "test", + [t for t in CISCO_IOSXE_TEST_CASES["channel.send_inputs_interact"]["tests"]], + ids=[n["name"] for n in CISCO_IOSXE_TEST_CASES["channel.send_inputs_interact"]["tests"]], +) +@pytest.mark.parametrize( + "driver", ["system", "ssh2", "paramiko"], ids=["system", "ssh2", "paramiko"] +) +def test_channel_send_inputs_interact(base_driver, driver, test): + conn = base_driver(**CISCO_IOSXE_DEVICE, driver=driver) + results = conn.channel.send_inputs_interact(test["inputs"]) + cleaned_result = clean_output_data(test, results[0].result) + assert cleaned_result == test["outputs"][0] + conn.close() diff --git a/tests/functional/test_data/devices/cisco_iosxe.json b/tests/functional/test_data/devices/cisco_iosxe.json new file mode 100644 index 00000000..5f6bc969 --- /dev/null +++ b/tests/functional/test_data/devices/cisco_iosxe.json @@ -0,0 +1,14 @@ +{ + "host": "172.18.0.11", + "port": 22, + "auth_username": "vrnetlab", + "auth_password": "VR-netlab9", + "auth_public_key": "", + "timeout_socket": 1, + "timeout_ssh": 2000, + "timeout_ops": 2, + "comms_prompt_pattern": "^[a-z0-9.\\-@()/:]{1,32}[#>$]$", + "comms_ansi": false, + "session_pre_login_handler": "", + "session_disable_paging": "terminal length 0" +} \ No newline at end of file diff --git a/tests/functional/test_data/test_cases/cisco_iosxe.json b/tests/functional/test_data/test_cases/cisco_iosxe.json new file mode 100644 index 00000000..b3592cd1 --- /dev/null +++ b/tests/functional/test_data/test_cases/cisco_iosxe.json @@ -0,0 +1,77 @@ +{ + "test_cases": { + "channel.send_inputs": { + "tests": [ + { + "name": "send input simple", + "notes": "should always work, no disable paging required, if this is broken things are in bad shape!", + "replace_bytes": false, + "replace_timestamps": false, + "replace_cfg_by": false, + "replace_crypto": false, + "kwargs": {}, + "inputs": [ + "show run | i hostname" + ], + "outputs": [ + "hostname csr1000v" + ] + }, + { + "name": "send input simple, don't strip prompt", + "notes": "should always work, no disable paging required, if this is broken things are in bad shape!", + "replace_bytes": false, + "replace_timestamps": false, + "replace_cfg_by": false, + "replace_crypto": false, + "kwargs": {"strip_prompt": false}, + "inputs": [ + "show run | i hostname" + ], + "outputs": [ + "hostname csr1000v\ncsr1000v#" + ] + }, + { + "name": "disable paging, send inputs", + "notes": "send input test that would require disable paging to have succeeded, base driver does NOT disable paging (or login handler), so we must do that here", + "replace_bytes": true, + "replace_timestamps": true, + "replace_cfg_by": true, + "replace_crypto": true, + "kwargs": {}, + "inputs": [ + "terminal length 0", + "show run" + ], + "outputs": [ + "", + "Building configuration...\nCurrent configuration : CONFIG_BYTES\n!\n! Last configuration change at TIME_STAMP_REPLACED\n!\nversion 16.4\nservice timestamps debug datetime msec\nservice timestamps log datetime msec\nno platform punt-keepalive disable-kernel-core\nplatform console serial\n!\nhostname csr1000v\n!\nboot-start-marker\nboot-end-marker\n!\n!\n!\nno aaa new-model\n!\n!\n!\n!\n!\n!\n!\n!\n!\n\n\n\nip domain name example.com\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\nsubscriber templating\n!\n!\n!\nmultilink bundle-name authenticated\n!\n!\n!\n!\n!\n!\n\n\n!\n!\n!\n!\n!\n!\n!\nlicense udi pid CSR1000V sn 9FKLJWM5EB0\ndiagnostic bootup level minimal\n!\nspanning-tree extend system-id\nnetconf-yang cisco-odm actions ACL\nnetconf-yang cisco-odm actions BGP\nnetconf-yang cisco-odm actions OSPF\nnetconf-yang cisco-odm actions Archive\nnetconf-yang cisco-odm actions IPRoute\nnetconf-yang cisco-odm actions EFPStats\nnetconf-yang cisco-odm actions IPSLAStats\nnetconf-yang cisco-odm actions Interfaces\nnetconf-yang cisco-odm actions Environment\nnetconf-yang cisco-odm actions FlowMonitor\nnetconf-yang cisco-odm actions MemoryStats\nnetconf-yang cisco-odm actions BFDNeighbors\nnetconf-yang cisco-odm actions BridgeDomain\nnetconf-yang cisco-odm actions CPUProcesses\nnetconf-yang cisco-odm actions LLDPNeighbors\nnetconf-yang cisco-odm actions VirtualService\nnetconf-yang cisco-odm actions MemoryProcesses\nnetconf-yang cisco-odm actions EthernetCFMStats\nnetconf-yang cisco-odm actions MPLSLDPNeighbors\nnetconf-yang cisco-odm actions PlatformSoftware\nnetconf-yang cisco-odm actions MPLSStaticBinding\nnetconf-yang cisco-odm actions MPLSForwardingTable\nnetconf-yang\n!\nrestconf\n!\nusername vrnetlab privilege 15 password 0 VR-netlab9\n!\nredundancy\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\n!\ninterface GigabitEthernet1\n ip address 10.0.0.15 255.255.255.0\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet2\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet3\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet4\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet5\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet6\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet7\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet8\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet9\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\ninterface GigabitEthernet10\n no ip address\n shutdown\n negotiation auto\n no mop enabled\n no mop sysid\n!\n!\nvirtual-service csr_mgmt\n!\nip forward-protocol nd\nno ip http server\nno ip http secure-server\n!\n!\n!\n!\n!\n!\n!\ncontrol-plane\n!\n !\n !\n !\n !\n!\n!\n!\n!\n!\nline con 0\n stopbits 1\nline vty 0\n login local\n transport input all\nline vty 1\n login local\n length 0\n transport input all\nline vty 2 4\n login local\n transport input all\n!\n!\n!\n!\n!\n!\nend" + ] + } + ] + }, + "channel.send_inputs_interact": { + "tests": [ + { + "name": "send interactive input", + "notes": "", + "replace_bytes": false, + "replace_timestamps": false, + "replace_cfg_by": false, + "replace_crypto": false, + "kwargs": {}, + "inputs": [ + "clear logg", + "Clear logging buffer [confirm]", + "", + "csr1000v#" + ], + "outputs": [ + "Clear logging buffer [confirm]\ncsr1000v#" + ] + } + ] + } + } +} \ No newline at end of file diff --git a/tests/unit/channel/test_channel.py b/tests/unit/channel/test_channel.py index ace6edd0..8f30a4b9 100644 --- a/tests/unit/channel/test_channel.py +++ b/tests/unit/channel/test_channel.py @@ -1,6 +1,19 @@ import pytest +def test__str(mocked_channel): + conn = mocked_channel([]) + assert str(conn.channel) == "nssh Channel Object" + + +def test__repr(mocked_channel): + conn = mocked_channel([]) + assert ( + repr(conn.channel) + == r"nssh Channel {'comms_prompt_pattern': '^[a-z0-9.\\-@()/:]{1,32}[#>$]$', 'comms_return_char': '\n', 'comms_ansi': False, 'timeout_ops': 10}" + ) + + def test__restructure_output_strip_prompt(mocked_channel): channel_output_1 = b"hostname 3560CX\r\n3560CX#" conn = mocked_channel([]) @@ -19,21 +32,21 @@ def test__read_until_prompt_default_pattern(mocked_channel): channel_output_1 = b"!\r\nntp server 172.31.255.1 prefer\r\n!\r\nend\r\n\r\n" conn = mocked_channel([]) output = conn.channel._read_until_prompt(channel_output_1) - assert output == b"!\n\nntp server 172.31.255.1 prefer\n\n!\n\nend\n\n\n\n3560CX#" + assert output == b"!\nntp server 172.31.255.1 prefer\n!\nend\n\n3560CX#" def test__read_until_prompt_regex_pattern(mocked_channel): channel_output_1 = b"!\r\nntp server 172.31.255.1 prefer\r\n!\r\nend\r\n\r\n" conn = mocked_channel([], comms_prompt_pattern="^3560CX#$") output = conn.channel._read_until_prompt(channel_output_1) - assert output == b"!\n\nntp server 172.31.255.1 prefer\n\n!\n\nend\n\n\n\n3560CX#" + assert output == b"!\nntp server 172.31.255.1 prefer\n!\nend\n\n3560CX#" def test__read_until_prompt_string_pattern(mocked_channel): channel_output_1 = b"!\r\nntp server 172.31.255.1 prefer\r\n!\r\nend\r\n\r\n" conn = mocked_channel([], comms_prompt_pattern="3560CX#") output = conn.channel._read_until_prompt(channel_output_1) - assert output == b"!\n\nntp server 172.31.255.1 prefer\n\n!\n\nend\n\n\n\n3560CX#" + assert output == b"!\nntp server 172.31.255.1 prefer\n!\nend\n\n3560CX#" # TODO i may not be stripping ansi from get prompt which could break shit w/ ansi in the prompt diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 056110dd..2de438e0 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -4,9 +4,8 @@ import pytest -from nssh.driver import NSSH +from nssh.driver import NSSH, NetworkDriver from nssh.driver.core.cisco_iosxe.driver import PRIVS -from nssh.driver.core.driver import NetworkDriver from nssh.transport.transport import Transport diff --git a/tests/unit/driver/core/cisco_iosxe/test_driver.py b/tests/unit/driver/core/cisco_iosxe/test_driver.py index e69de29b..3674a794 100644 --- a/tests/unit/driver/core/cisco_iosxe/test_driver.py +++ b/tests/unit/driver/core/cisco_iosxe/test_driver.py @@ -0,0 +1,6 @@ +from nssh.driver.core.cisco_iosxe.driver import IOSXEDriver + + +def test_init(): + conn = IOSXEDriver("enable-pass") + assert conn.auth_secondary == "enable-pass" diff --git a/tests/unit/driver/test_driver.py b/tests/unit/driver/test_driver.py index 4e2a3aaa..e00bbf09 100644 --- a/tests/unit/driver/test_driver.py +++ b/tests/unit/driver/test_driver.py @@ -74,6 +74,12 @@ def test_auth_password_strip(): assert conn.auth_password == "password" +def test_auth_strict_key_invalid(): + with pytest.raises(TypeError) as e: + NSSH(auth_strict_key="notreal") + assert str(e.value) == "auth_strict_key should be bool, got " + + def test_valid_comms_return_char(): conn = NSSH(comms_return_char="\rn") assert conn.comms_return_char == "\rn" diff --git a/tests/unit/driver/core/test_driver.py b/tests/unit/driver/test_network_driver.py similarity index 71% rename from tests/unit/driver/core/test_driver.py rename to tests/unit/driver/test_network_driver.py index b0eba331..dd0d26e4 100644 --- a/tests/unit/driver/core/test_driver.py +++ b/tests/unit/driver/test_network_driver.py @@ -2,10 +2,9 @@ import pytest -from nssh.driver.core.driver import PrivilegeLevel +from nssh.driver.network_driver import PrivilegeLevel from nssh.exceptions import CouldNotAcquirePrivLevel, UnknownPrivLevel - try: import ntc_templates import txtfsm @@ -187,6 +186,26 @@ def test_acquire_priv(mocked_network_driver): conn.acquire_priv("configuration") +def test_acquire_priv_deescalate(mocked_network_driver): + channel_input_1 = "\n" + channel_output_1 = "3560CX(config)#" + channel_input_2 = "\n" + channel_output_2 = "3560CX(config)#" + channel_input_3 = "end" + channel_output_3 = "3560CX#" + channel_input_4 = "\n" + channel_output_4 = "3560CX#" + channel_ops = [ + (channel_input_1, channel_output_1), + (channel_input_2, channel_output_2), + (channel_input_3, channel_output_3), + (channel_input_4, channel_output_4), + ] + + conn = mocked_network_driver(channel_ops) + conn.acquire_priv("privilege_exec") + + def test_acquire_priv_could_not_acquire_priv(mocked_network_driver): channel_input_1 = "\n" channel_output_1 = "\n3560CX>" @@ -217,12 +236,6 @@ def test_acquire_priv_could_not_acquire_priv(mocked_network_driver): channel_input_13 = "\n" channel_output_13 = "\n3560CX>" - # channel_input_14 = "\n" - # channel_output_14 = "3560CX>" - # channel_input_15 = "enable" - # channel_output_15 = "Password: " - # channel_input_16 = "password123" - # channel_output_16 = "3560CX>" channel_ops = [ (channel_input_1, channel_output_1), @@ -238,10 +251,6 @@ def test_acquire_priv_could_not_acquire_priv(mocked_network_driver): (channel_input_11, channel_output_11), (channel_input_12, channel_output_12), (channel_input_13, channel_output_13), - # (channel_input_14, channel_output_14), - # (channel_input_15, channel_output_15), - # (channel_input_16, channel_output_16), - # (channel_input_17, channel_output_17), ] conn = mocked_network_driver(channel_ops) @@ -277,11 +286,9 @@ def test_acquire_priv_could_not_acquire_priv(mocked_network_driver): } conn.privs = mock_privs - # TODO - just need to trick this into counting too many escalate attempts - def _mock_escalate(self): self.__class__._escalate(self) - self.channel.comms_prompt_pattern = mock_privs['exec'].pattern + self.channel.comms_prompt_pattern = mock_privs["exec"].pattern def _mock_get_prompt(): return "3560CX>" @@ -292,3 +299,70 @@ def _mock_get_prompt(): with pytest.raises(CouldNotAcquirePrivLevel) as exc: conn.acquire_priv("privilege_exec") assert str(exc.value) == "Could not get to 'privilege_exec' privilege level." + + +def test_send_commands(mocked_network_driver): + channel_input_1 = "\n" + channel_output_1 = "\n3560CX#" + channel_input_2 = "show ip access-lists" + channel_output_2 = """Extended IP access list ext_acl_fw + 10 deny ip 0.0.0.0 0.255.255.255 any + 20 deny ip 10.0.0.0 0.255.255.255 any + 30 deny ip 100.64.0.0 0.63.255.255 any (2 matches) + 40 deny ip 127.0.0.0 0.255.255.255 any + 50 deny ip 169.254.0.0 0.0.255.255 any + 60 deny ip 172.16.0.0 0.15.255.255 any + 70 deny ip 192.0.0.0 0.0.0.255 any + 80 deny ip 192.0.2.0 0.0.0.255 any + 90 deny ip 192.168.0.0 0.0.255.255 any + 100 deny ip 198.18.0.0 0.1.255.255 any + 110 deny ip 198.51.100.0 0.0.0.255 any + 120 deny ip 203.0.113.0 0.0.0.255 any + 130 deny ip 224.0.0.0 15.255.255.255 any + 140 deny ip 240.0.0.0 15.255.255.255 any +3560CX#""" + test_operations = [(channel_input_1, channel_output_1), (channel_input_2, channel_output_2)] + conn = mocked_network_driver(test_operations) + conn.default_desired_priv = "privilege_exec" + output = conn.send_commands(channel_input_2, strip_prompt=False) + assert output[0].result == channel_output_2 + + +def test_send_configs(mocked_network_driver): + channel_input_1 = "\n" + channel_output_1 = "\n3560CX#" + channel_input_2 = "\n" + channel_output_2 = "\n3560CX#" + channel_input_3 = "configure terminal" + channel_output_3 = """Enter configuration commands, one per line. End with CNTL/Z. +3560CX(config)#""" + channel_input_4 = "\n" + channel_output_4 = "3560CX(config)#" + channel_input_5 = "hostname XC0653" + channel_output_5 = "XC0653(config)#" + channel_input_6 = "\n" + channel_output_6 = "XC0653(config)#" + channel_input_7 = "\n" + channel_output_7 = "XC0653(config)#" + channel_input_8 = "end" + channel_output_8 = "3560CX#" + channel_input_9 = "\n" + channel_output_9 = "\n3560CX#" + test_operations = [ + (channel_input_1, channel_output_1), + (channel_input_2, channel_output_2), + (channel_input_3, channel_output_3), + (channel_input_4, channel_output_4), + (channel_input_5, channel_output_5), + (channel_input_6, channel_output_6), + (channel_input_7, channel_output_7), + (channel_input_8, channel_output_8), + (channel_input_9, channel_output_9), + ] + conn = mocked_network_driver(test_operations) + conn.default_desired_priv = "privilege_exec" + output = conn.send_configs(channel_input_5, strip_prompt=False) + assert output[0].result == channel_output_5 + + +# TODO -- add textfsm parsing test diff --git a/tests/unit/transport/test_socket.py b/tests/unit/transport/test_socket.py index c2b66b3c..28bc3855 100644 --- a/tests/unit/transport/test_socket.py +++ b/tests/unit/transport/test_socket.py @@ -35,3 +35,18 @@ def test_socket_close_success(): def test_socket_isalive_false(): sock = Socket("localhost", 22, 1) assert sock.socket_isalive() is False + + +def test__str(): + sock = Socket("localhost", 22, 1) + assert str(sock) == "Socket Object for host localhost" + + +def test__repr(): + sock = Socket("localhost", 22, 1) + assert repr(sock) == "Socket {'host': 'localhost', 'port': 22, 'timeout': 1, 'sock': None}" + + +def test__bool(): + sock = Socket("localhost", 22, 1) + assert bool(sock) is False diff --git a/tox.ini b/tox.ini index cc65241e..0b471662 100644 --- a/tox.ini +++ b/tox.ini @@ -14,8 +14,9 @@ commands = --cov-report html \ --cov-report term \ tests/unit/. - python -m isort -rc -w 100 -y . + python -m isort -rc -y . python -m black . python -m pylama . python -m pydocstyle . + python -m mypy --strict nssh/ bash -c 'find nssh -type f \( -iname "*.py" ! -iname "ptyprocess.py" \) | xargs darglint'