diff --git a/netengine/backends/base.py b/netengine/backends/base.py index 86509b0..fdd414d 100644 --- a/netengine/backends/base.py +++ b/netengine/backends/base.py @@ -21,14 +21,14 @@ def __repr__(self): """returns unicode string represantation""" return self.__str__() - def validate(self): + def validate(self, *args, **kwargs): raise NotImplementedError('Not implemented') - def to_dict(self): + def to_dict(self, autowalk=True): raise NotImplementedError('Not implemented') - def to_json(self, **kwargs): - dictionary = self.to_dict() + def to_json(self, autowalk=True, **kwargs): + dictionary = self.to_dict(autowalk=autowalk) return json.dumps(dictionary, **kwargs) @property diff --git a/netengine/backends/dummy.py b/netengine/backends/dummy.py index 2173234..5db8597 100644 --- a/netengine/backends/dummy.py +++ b/netengine/backends/dummy.py @@ -63,7 +63,7 @@ def get_interfaces(self): }, ] - def to_dict(self): + def to_dict(self, *args, **kwargs): return self._dict( { 'name': 'dummy', diff --git a/netengine/backends/snmp/airos.py b/netengine/backends/snmp/airos.py index 78ac856..5145439 100644 --- a/netengine/backends/snmp/airos.py +++ b/netengine/backends/snmp/airos.py @@ -9,6 +9,8 @@ import logging from datetime import datetime +from pytrie import StringTrie as Trie + from .base import SNMP logger = logging.getLogger(__name__) @@ -19,56 +21,54 @@ class AirOS(SNMP): Ubiquiti AirOS SNMP backend """ - _oid_to_retrieve = '1.3.6.1.2.1.1.9.1.1' + _oid_to_retrieve = '1.3.6.1.2.1.1.9.1.1.' - def __str__(self): + def __str__(self, snmpdump=None): """print a human readable object description""" return f'' - def validate(self): + def validate(self, snmpdump=None): """ raises NetEngineError exception if anything is wrong with the connection for example: wrong host, invalid community """ # this triggers a connection which # will raise an exception if anything is wrong - return self.name + return self.name(snmpdump=snmpdump) - @property - def os(self): + def os(self, snmpdump=None): """ returns (os_name, os_version) """ os_name = 'AirOS' - os_version = self.get_value('1.3.6.1.2.1.1.1.0').split('#')[0].strip() + os_version = ( + self.get_value('1.3.6.1.2.1.1.1.0', snmpdump=snmpdump).split('#')[0].strip() + ) return os_name, os_version - @property - def name(self): + def name(self, snmpdump=None): """ returns a string containing the device name """ - return self.get_value('1.3.6.1.2.1.1.5.0') + return self.get_value('1.3.6.1.2.1.1.5.0', snmpdump=snmpdump) - @property - def model(self): + def model(self, snmpdump=None): """ returns a string containing the device model """ oids = ['1.2.840.10036.3.1.2.1.3.5', '1.2.840.10036.3.1.2.1.3.8'] for oid in oids: - model = self.get_value(oid) + model = self.get_value(oid, snmpdump=snmpdump) if model != '': return model - @property - def firmware(self): + def firmware(self, snmpdump=None): """ returns a string containing the device firmware """ oids = ['1.2.840.10036.3.1.2.1.4.5', '1.2.840.10036.3.1.2.1.4.8'] for oid in oids: - tmp = self.get_value(oid).split('.') + tmp = self.get_value(oid, snmpdump=snmpdump).split('.') if tmp is not None: length = len(tmp) i = 0 @@ -77,37 +77,35 @@ def firmware(self): return 'AirOS ' + '.'.join(tmp[i:length]) i = i + 1 - @property - def manufacturer(self): - return self.get_manufacturer(self.interfaces_MAC[1]['mac_address']) + def manufacturer(self, snmpdump=None): + return self.get_manufacturer( + self.interfaces_MAC(snmpdump=snmpdump)[1]['mac_address'] + ) - @property - def ssid(self): + def ssid(self, snmpdump=None): """ returns a string containing the wireless ssid """ oids = ['1.2.840.10036.1.1.1.9.5', '1.2.840.10036.1.1.1.9.8'] for oid in oids: - if self.get_value(oid) != '': - return self.get_value(oid) + if self.get_value(oid, snmpdump=snmpdump) != '': + return self.get_value(oid, snmpdump=snmpdump) - @property - def uptime(self): + def uptime(self, snmpdump=None): """ returns an integer representing the number of seconds of uptime """ - return int(self.get_value('1.3.6.1.2.1.1.3.0')) // 100 + return int(self.get_value('1.3.6.1.2.1.1.3.0', snmpdump=snmpdump)) // 100 - @property - def interfaces_number(self): + def interfaces_number(self, snmpdump=None): """ Returns the number of the network interfaces """ - return int(self.get_value('1.3.6.1.2.1.2.1.0')) + return int(self.get_value('1.3.6.1.2.1.2.1.0', snmpdump=snmpdump)) _interfaces = None - def get_interfaces(self): + def get_interfaces(self, snmpdump=None): """ returns the list of all the interfaces of the device """ @@ -115,10 +113,10 @@ def get_interfaces(self): interfaces = [] value_to_get = '1.3.6.1.2.1.2.2.1.2.' - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): value_to_get1 = value_to_get + str(i) if value_to_get1: - interfaces.append(self.get_value(value_to_get1)) + interfaces.append(self.get_value(value_to_get1, snmpdump=snmpdump)) self._interfaces = interfaces @@ -126,8 +124,7 @@ def get_interfaces(self): _interfaces_mtu = None - @property - def interfaces_mtu(self): + def interfaces_mtu(self, snmpdump=None): """ Returns an ordereed dict with the interface and its MTU """ @@ -138,11 +135,11 @@ def interfaces_mtu(self): tmp[18] = str(4) to = ''.join(tmp) - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): result = self._dict( { - 'name': self.get_value(starting + str(i)), - 'mtu': int(self.get_value(to + str(i))), + 'name': self.get_value(starting + str(i), snmpdump=snmpdump), + 'mtu': int(self.get_value(to + str(i), snmpdump=snmpdump)), } ) results.append(result) @@ -153,8 +150,7 @@ def interfaces_mtu(self): _interfaces_state = None - @property - def interfaces_state(self): + def interfaces_state(self, snmpdump=None): """ Returns an ordereed dict with the interfaces and their state (up, down) """ @@ -164,20 +160,27 @@ def interfaces_state(self): operative = '1.3.6.1.2.1.2.2.1.8.' tmp = list(starting) tmp[18] = str(4) - for i in self._value_to_retrieve(): - if self.get_value(starting + str(i)) != '': - if int(self.get_value(operative + str(i))) == 1: + for i in self._value_to_retrieve(snmpdump=snmpdump): + if self.get_value(starting + str(i), snmpdump=snmpdump) != '': + if int(self.get_value(operative + str(i), snmpdump=snmpdump)) == 1: result = self._dict( - {'name': self.get_value(starting + str(i)), 'state': 'up'} + { + 'name': self.get_value( + starting + str(i), snmpdump=snmpdump + ), + 'state': 'up', + } ) else: result = self._dict( { - 'name': self.get_value(starting + str(i)), + 'name': self.get_value( + starting + str(i), snmpdump=snmpdump + ), 'state': 'down', } ) - elif self.get_value(starting + str(i)) == '': + elif self.get_value(starting + str(i), snmpdump=snmpdump) == '': result = self._dict({'name': '', 'state': ''}) # append result to list results.append(result) @@ -188,8 +191,7 @@ def interfaces_state(self): _interfaces_speed = None - @property - def interfaces_speed(self): + def interfaces_speed(self, snmpdump=None): """ Returns an ordered dict with the interface and ist speed in bps """ @@ -198,11 +200,13 @@ def interfaces_speed(self): starting = '1.3.6.1.2.1.2.2.1.2.' starting_speed = '1.3.6.1.2.1.2.2.1.5.' - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): result = self._dict( { - 'name': self.get_value(starting + str(i)), - 'speed': int(self.get_value(starting_speed + str(i))), + 'name': self.get_value(starting + str(i), snmpdump=snmpdump), + 'speed': int( + self.get_value(starting_speed + str(i), snmpdump=snmpdump) + ), } ) results.append(result) @@ -213,8 +217,7 @@ def interfaces_speed(self): _interfaces_bytes = None - @property - def interfaces_bytes(self): + def interfaces_bytes(self, snmpdump=None): """ Returns an ordereed dict with the interface and its tx and rx octets (1 octet = 1 byte = 8 bits) """ @@ -224,12 +227,16 @@ def interfaces_bytes(self): starting_rx = '1.3.6.1.2.1.2.2.1.10.' starting_tx = '1.3.6.1.2.1.2.2.1.16.' - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): result = self._dict( { - 'name': self.get_value(starting + str(i)), - 'tx': int(self.get_value(starting_tx + str(i))), - 'rx': int(self.get_value(starting_rx + str(i))), + 'name': self.get_value(starting + str(i), snmpdump=snmpdump), + 'tx': int( + self.get_value(starting_tx + str(i), snmpdump=snmpdump) + ), + 'rx': int( + self.get_value(starting_rx + str(i), snmpdump=snmpdump) + ), } ) results.append(result) @@ -239,8 +246,7 @@ def interfaces_bytes(self): _interfaces_MAC = None - @property - def interfaces_MAC(self): + def interfaces_MAC(self, snmpdump=None): """ Returns an ordered dict with the hardware address of every interface """ @@ -249,9 +255,9 @@ def interfaces_MAC(self): starting = '1.3.6.1.2.1.2.2.1.2.' starting_mac = '1.3.6.1.2.1.2.2.1.6.' - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): mac = binascii.b2a_hex( - self.get_value(starting_mac + str(i)).encode() + self.get_value(starting_mac + str(i), snmpdump=snmpdump).encode() ).decode() # now we are going to format mac as the canonical way as a MAC # address is intended by inserting ':' every two chars of mac @@ -261,7 +267,7 @@ def interfaces_MAC(self): ) result = self._dict( { - 'name': self.get_value(starting + str(i)), + 'name': self.get_value(starting + str(i), snmpdump=snmpdump), 'mac_address': mac_transformed, } ) @@ -273,8 +279,7 @@ def interfaces_MAC(self): _interfaces_type = None - @property - def interfaces_type(self): + def interfaces_type(self, snmpdump=None): """ Returns an ordered dict with the interface type (e.g Ethernet, loopback) """ @@ -284,11 +289,13 @@ def interfaces_type(self): starting = '1.3.6.1.2.1.2.2.1.2.' types_oid = '1.3.6.1.2.1.2.2.1.3.' - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): result = self._dict( { - 'name': self.get_value(starting + str(i)), - 'type': types[self.get_value(types_oid + str(i))], + 'name': self.get_value(starting + str(i), snmpdump=snmpdump), + 'type': types[ + self.get_value(types_oid + str(i), snmpdump=snmpdump) + ], } ) results.append(result) @@ -297,45 +304,48 @@ def interfaces_type(self): return self._interfaces_type - @property - def interfaces_to_dict(self): + def interfaces_to_dict(self, snmpdump=None): """ Returns an ordered dict with all the information available about the interface """ results = [] - for i in range(0, len(self.get_interfaces())): + for i in range(0, len(self.get_interfaces(snmpdump=snmpdump))): logger.info(f'===== {i} =====') result = self._dict( { - 'name': self.interfaces_MAC[i]['name'], + 'name': self.interfaces_MAC(snmpdump=snmpdump)[i]['name'], 'statistics': { - 'rx_bytes': int(self.interfaces_bytes[i]['rx']), - 'tx_bytes': int(self.interfaces_bytes[i]['tx']), + 'rx_bytes': int( + self.interfaces_bytes(snmpdump=snmpdump)[i]['rx'] + ), + 'tx_bytes': int( + self.interfaces_bytes(snmpdump=snmpdump)[i]['tx'] + ), }, } ) results.append(result) return results - @property - def wireless_dbm(self): + def wireless_dbm(self, snmpdump=None): """ returns a list with the wireless signal (dbm) of the link/s """ - res = self.next('1.3.6.1.4.1.14988.1.1.1.2.1.3.0') + res = self.next('1.3.6.1.4.1.14988.1.1.1.2.1.3.0.', snmpdump=snmpdump) dbm = [] for i in range(0, len(res[3])): dbm.append(int(res[3][i][0][1])) return dbm - @property - def wireless_links(self): + def wireless_links(self, snmpdump=None): ''' Returns an ordered dict with all the infos about the wireless link/s ''' final = [] - results = self.next('1.3.6.1.4.1.14988.1.1.1.2.1') - link_number = len(self.next('1.3.6.1.4.1.14988.1.1.1.2.1.3')[3]) + results = self.next('1.3.6.1.4.1.14988.1.1.1.2.1.', snmpdump=snmpdump) + link_number = len( + self.next('1.3.6.1.4.1.14988.1.1.1.2.1.3.', snmpdump=snmpdump)[3] + ) separated_by_meaning = [] dbm = [] tx_bytes = [] @@ -372,100 +382,100 @@ def wireless_links(self): final.append(result) return final - @property - def local_time(self): + def local_time(self, snmpdump=None): """ returns the local time of the host device as a timestamp """ - epoch = str(self.get('1.3.6.1.4.1.41112.1.4.8.1.0')[3][0][1]) + epoch = str(self.get('1.3.6.1.4.1.41112.1.4.8.1.0', snmpdump=snmpdump)[3][0][1]) timestamp = int(datetime.strptime(epoch, '%Y-%m-%d %H:%M:%S').timestamp()) return timestamp - @property - def RAM_total(self): + def RAM_total(self, snmpdump=None): """ Returns the total RAM of the device """ - total = self.get_value('1.3.6.1.4.1.10002.1.1.1.1.1.0') + total = self.get_value('1.3.6.1.4.1.10002.1.1.1.1.1.0', snmpdump=snmpdump) return int(total) - @property - def RAM_free(self): + def RAM_free(self, snmpdump=None): """ Returns the free RAM of the device """ - free = self.get_value('1.3.6.1.4.1.10002.1.1.1.1.2.0') + free = self.get_value('1.3.6.1.4.1.10002.1.1.1.1.2.0', snmpdump=snmpdump) return int(free) - @property - def RAM_buffered(self): + def RAM_buffered(self, snmpdump=None): """ Returns the buffered RAM of the device """ - buffered = self.get_value('1.3.6.1.4.1.10002.1.1.1.1.3.0') + buffered = self.get_value('1.3.6.1.4.1.10002.1.1.1.1.3.0', snmpdump=snmpdump) return int(buffered) - @property - def RAM_cached(self): + def RAM_cached(self, snmpdump=None): """ Returns the cached RAM of the device """ - cached = self.get_value('1.3.6.1.4.1.10002.1.1.1.1.4.0') + cached = self.get_value('1.3.6.1.4.1.10002.1.1.1.1.4.0', snmpdump=snmpdump) return int(cached) - @property - def load(self): + def load(self, snmpdump=None): """ Returns an array with load average values respectively in the last minute, in the last 5 minutes and in the last 15 minutes """ - array = (self.next('1.3.6.1.4.1.10002.1.1.1.4.2.1.3'))[3] + array = self.next('1.3.6.1.4.1.10002.1.1.1.4.2.1.3.', snmpdump=snmpdump)[3] one = int(array[0][0][1]) five = int(array[1][0][1]) fifteen = int(array[2][0][1]) return [one, five, fifteen] - @property - def SWAP_total(self): + def SWAP_total(self, snmpdump=None): """ Returns the total SWAP of the device """ - total = self.get_value('1.3.6.1.4.1.10002.1.1.1.2.1.0') + total = self.get_value('1.3.6.1.4.1.10002.1.1.1.2.1.0', snmpdump=snmpdump) return int(total) - @property - def SWAP_free(self): + def SWAP_free(self, snmpdump=None): """ Returns the free SWAP of the device """ - free = self.get_value('1.3.6.1.4.1.10002.1.1.1.2.2.0') + free = self.get_value('1.3.6.1.4.1.10002.1.1.1.2.2.0', snmpdump=snmpdump) return int(free) - @property - def resources_to_dict(self): + def resources_to_dict(self, snmpdump=None): """ returns an ordered dict with hardware resources information """ result = self._dict( { - 'load': self.load, + 'load': self.load(snmpdump=snmpdump), 'memory': { - 'total': self.RAM_total, - 'buffered': self.RAM_buffered, - 'free': self.RAM_free, - 'cached': self.RAM_cached, + 'total': self.RAM_total(snmpdump=snmpdump), + 'buffered': self.RAM_buffered(snmpdump=snmpdump), + 'free': self.RAM_free(snmpdump=snmpdump), + 'cached': self.RAM_cached(snmpdump=snmpdump), + }, + 'swap': { + 'total': self.SWAP_total(snmpdump=snmpdump), + 'free': self.SWAP_free(snmpdump=snmpdump), }, - 'swap': {'total': self.SWAP_total, 'free': self.SWAP_free}, } ) return result - def to_dict(self): - return self._dict( + def to_dict(self, snmpdump=None, autowalk=True): + if autowalk: + snmpdump = Trie(self.walk('1.3.6')) + result = self._dict( { 'type': 'DeviceMonitoring', - 'general': {'uptime': self.uptime, 'local_time': self.local_time}, - 'resources': self.resources_to_dict, - 'interfaces': self.interfaces_to_dict, + 'general': { + 'uptime': self.uptime(snmpdump=snmpdump), + 'local_time': self.local_time(snmpdump=snmpdump), + }, + 'resources': self.resources_to_dict(snmpdump=snmpdump), + 'interfaces': self.interfaces_to_dict(snmpdump=snmpdump), } ) + return result diff --git a/netengine/backends/snmp/base.py b/netengine/backends/snmp/base.py index 9caff9d..8cbaf69 100644 --- a/netengine/backends/snmp/base.py +++ b/netengine/backends/snmp/base.py @@ -9,6 +9,7 @@ import logging import netaddr +from pysnmp.hlapi import ContextData, ObjectIdentity, ObjectType, SnmpEngine, nextCmd from netengine.backends import BaseBackend from netengine.exceptions import NetEngineError @@ -94,7 +95,26 @@ def _oid(self, oid): # ensure is string (could be unicode) return str(oid) - def get(self, oid): + def walk(self, oid): + result = dict() + for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd( + SnmpEngine(), + self.community, + self.transport, + ContextData(), + ObjectType(ObjectIdentity(oid)), + lexicographicMode=True, + ): + if errorIndication: + raise NetEngineError(errorIndication) + elif errorStatus: + raise NetEngineError(errorStatus) + else: + for varBind in varBinds: + result[str(varBind[0])] = [None, None, None, [varBind]] + return result + + def get(self, oid, snmpdump=None): """ alias to cmdgen.CommandGenerator().getCmd :oid string|tuple|list: string, tuple or list representing the OID to get @@ -106,10 +126,13 @@ def get(self, oid): * [1, 3, 6, 1, 2, 1, 1, 5, 0] * (1, 3, 6, 1, 2, 1, 1, 5, 0) """ + if snmpdump is not None: + # if OID doesn't exist, then pysnmp returns an empty string + return snmpdump.get(oid, [None, None, None, [[None, '']]]) logger.info(f'DEBUG: SNMP GET {self._oid(oid)}') return self._command.getCmd(self.community, self.transport, self._oid(oid)) - def next(self, oid): + def next(self, oid, snmpdump=None): """ alias to cmdgen.CommandGenerator().nextCmd :oid string|tuple|list: string, tuple or list representing the OID to get @@ -121,21 +144,26 @@ def next(self, oid): * [1, 3, 6, 1, 2, 1, 1, 5, 0] * (1, 3, 6, 1, 2, 1, 1, 5, 0) """ + if snmpdump is not None: + res = [None, 0, 0, []] + for item in snmpdump.items(prefix=oid): + res[3] += [item[1][3]] + return res logger.info(f'DEBUG: SNMP NEXT {self._oid(oid)}') return self._command.nextCmd(self.community, self.transport, self._oid(oid)) - def get_value(self, oid): + def get_value(self, oid, snmpdump=None): """ returns value of oid, or raises NetEngineError Exception is anything wrong :oid string|tuple|list: string, tuple or list representing the OID to get """ - result = self.get(oid) + result = self.get(oid, snmpdump=snmpdump) try: return str(result[3][0][1]) # snmp stores results in several arrays except IndexError: raise NetEngineError(str(result[0])) - def _value_to_retrieve(self): + def _value_to_retrieve(self, snmpdump=None): """ return the final SNMP indexes for the interfaces to be used in the other methods and properties """ @@ -146,7 +174,7 @@ def _value_to_retrieve(self): 'Please fix properly the _oid_to_retrieve string in OpenWRT or AirOS SNMP backend' ) - indexes = self.next(self._oid_to_retrieve)[3] + indexes = self.next(self._oid_to_retrieve, snmpdump=snmpdump)[3] for i in range(len(indexes)): value_to_retr.append(int(indexes[i][0][1])) diff --git a/netengine/backends/snmp/openwrt.py b/netengine/backends/snmp/openwrt.py index 20808f1..c7586fb 100644 --- a/netengine/backends/snmp/openwrt.py +++ b/netengine/backends/snmp/openwrt.py @@ -12,6 +12,7 @@ import pytz from netaddr import EUI, mac_unix_expanded +from pytrie import StringTrie as Trie from netengine.backends.snmp import SNMP @@ -25,62 +26,61 @@ class OpenWRT(SNMP): OpenWRT SNMP backend """ - _oid_to_retrieve = '1.3.6.1.2.1.2.2.1.1' + _oid_to_retrieve = '1.3.6.1.2.1.2.2.1.1.' _interface_dict = {} def __str__(self): """print a human readable object description""" return f'' - def validate(self): + def validate(self, snmpdump=None): """ raises NetEngineError exception if anything is wrong with the connection for example: wrong host, invalid community """ # this triggers a connection which # will raise an exception if anything is wrong - return self.name + return self.name(snmpdump=snmpdump) - @property - def os(self): + def os(self, snmpdump=None): """ returns (os_name, os_version) """ os_name = 'OpenWRT' - os_version = self.get_value('1.3.6.1.2.1.1.1.0').split('#')[0].strip() + os_version = ( + self.get_value('1.3.6.1.2.1.1.1.0', snmpdump=snmpdump).split('#')[0].strip() + ) return os_name, os_version - @property - def manufacturer(self): + def manufacturer(self, snmpdump=None): # TODO: this is dangerous, it might not work in all cases - return self.get_manufacturer(self.interfaces_MAC[1]['mac_address']) + return self.get_manufacturer( + self.interfaces_MAC(snmpdump=snmpdump)[1]['mac_address'] + ) - @property - def name(self): + def name(self, snmpdump=None): """ returns a string containing the device name """ - return self.get_value('1.3.6.1.2.1.1.5.0') + return self.get_value('1.3.6.1.2.1.1.5.0', snmpdump=snmpdump) - @property - def uptime(self): + def uptime(self, snmpdump=None): """ returns an integer representing the number of seconds of uptime """ - return int(self.get_value('1.3.6.1.2.1.1.3.0')) // 100 + return int(self.get_value('1.3.6.1.2.1.1.3.0', snmpdump=snmpdump)) // 100 - @property - def uptime_tuple(self): + def uptime_tuple(self, snmpdump=None): """ returns (days, hours, minutes) """ - td = timedelta(seconds=self.uptime) + td = timedelta(seconds=self.uptime(snmpdump=snmpdump)) return td.days, td.seconds // 3600, (td.seconds // 60) % 60 _interfaces = None - def get_interfaces(self): + def get_interfaces(self, snmpdump=None): """ returns the list of all the interfaces of the device """ @@ -88,11 +88,11 @@ def get_interfaces(self): interfaces = [] value_to_get = '1.3.6.1.2.1.2.2.1.2.' - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): value_to_get1 = value_to_get + str(i) if value_to_get1: - interfaces.append(self.get_value(value_to_get1)) + interfaces.append(self.get_value(value_to_get1, snmpdump=snmpdump)) self._interfaces = [_f for _f in interfaces if _f] @@ -100,24 +100,28 @@ def get_interfaces(self): _interfaces_MAC = None - @property - def interfaces_MAC(self): + def interfaces_MAC(self, snmpdump=None): """ Returns an ordered dict with the hardware address of every interface """ if self._interfaces_MAC is None: results = [] mac1 = [] - mac = self.next('1.3.6.1.2.1.2.2.1.6.')[3] + mac = self.next('1.3.6.1.2.1.2.2.1.6.', snmpdump=snmpdump)[3] for i in range(1, len(mac) + 1): - mac1.append(self.get_value('1.3.6.1.2.1.2.2.1.6.' + str(i))) + mac1.append( + self.get_value('1.3.6.1.2.1.2.2.1.6.' + str(i), snmpdump=snmpdump) + ) mac_trans = [] for i in mac1: mac_string = self._octet_to_mac(i) mac_trans.append(mac_string) - for i in range(0, len(self.get_interfaces())): + for i in range(0, len(self.get_interfaces(snmpdump=snmpdump))): result = self._dict( - {'name': self.get_interfaces()[i], 'mac_address': mac_trans[i]} + { + 'name': self.get_interfaces(snmpdump=snmpdump)[i], + 'mac_address': mac_trans[i], + } ) results.append(result) @@ -127,8 +131,7 @@ def interfaces_MAC(self): _interfaces_mtu = None - @property - def interfaces_mtu(self): + def interfaces_mtu(self, snmpdump=None): """ Returns an ordereed dict with the interface and its MTU """ @@ -139,11 +142,11 @@ def interfaces_mtu(self): tmp[18] = str(4) to = ''.join(tmp) - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): result = self._dict( { - 'name': self.get_value(starting + str(i)), - 'mtu': int(self.get_value(to + str(i))), + 'name': self.get_value(starting + str(i), snmpdump=snmpdump), + 'mtu': int(self.get_value(to + str(i), snmpdump=snmpdump)), } ) results.append(result) @@ -154,8 +157,7 @@ def interfaces_mtu(self): _interfaces_speed = None - @property - def interfaces_speed(self): + def interfaces_speed(self, snmpdump=None): """ Returns an ordered dict with the interface and ist speed in bps """ @@ -176,7 +178,7 @@ def interfaces_speed(self): break # get name - name = self.get_value(starting + str(i)) + name = self.get_value(starting + str(i), snmpdump=snmpdump) # if nothing found if name == '': @@ -191,7 +193,7 @@ def interfaces_speed(self): consecutive_fails = 0 # get speed and convert to int - speed = int(self.get_value(starting_speed + str(i))) + speed = int(self.get_value(starting_speed + str(i), snmpdump=snmpdump)) result = self._dict({'name': name, 'speed': speed}) @@ -205,8 +207,7 @@ def interfaces_speed(self): _interfaces_up = None - @property - def interfaces_up(self): + def interfaces_up(self, snmpdump=None): """ Returns an ordereed dict with the interfaces and their state (up: true/false) """ @@ -216,16 +217,21 @@ def interfaces_up(self): operative = '1.3.6.1.2.1.2.2.1.8.' tmp = list(starting) tmp[18] = str(4) - for i in self._value_to_retrieve(): - if self.get_value(starting + str(i)) != '': + for i in self._value_to_retrieve(snmpdump=snmpdump): + if self.get_value(starting + str(i), snmpdump=snmpdump) != '': result = self._dict( { - 'name': self.get_value(starting + str(i)), - 'up': int(self.get_value(operative + str(i))) == 1, + 'name': self.get_value( + starting + str(i), snmpdump=snmpdump + ), + 'up': int( + self.get_value(operative + str(i), snmpdump=snmpdump) + ) + == 1, } ) results.append(result) - elif self.get_value(starting + str(i)) == '': + elif self.get_value(starting + str(i), snmpdump=snmpdump) == '': result = self._dict({'name': '', 'state': ''}) results.append(result) @@ -235,8 +241,7 @@ def interfaces_up(self): _interfaces_bytes = None - @property - def interfaces_bytes(self): + def interfaces_bytes(self, snmpdump=None): """ Returns an ordereed dict with the interface and its tx and rx octets (1 octet = 1 byte = 8 bits) """ @@ -246,12 +251,16 @@ def interfaces_bytes(self): starting_rx = '1.3.6.1.2.1.2.2.1.10.' starting_tx = '1.3.6.1.2.1.2.2.1.16.' - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): result = self._dict( { - 'name': self.get_value(starting + str(i)), - 'tx': int(self.get_value(starting_tx + str(i))), - 'rx': int(self.get_value(starting_rx + str(i))), + 'name': self.get_value(starting + str(i), snmpdump=snmpdump), + 'tx': int( + self.get_value(starting_tx + str(i), snmpdump=snmpdump) + ), + 'rx': int( + self.get_value(starting_rx + str(i), snmpdump=snmpdump) + ), } ) results.append(result) @@ -262,8 +271,7 @@ def interfaces_bytes(self): _interfaces_type = None - @property - def interfaces_type(self): + def interfaces_type(self, snmpdump=None): """ Returns an ordered dict with the interface type (e.g Ethernet, loopback) """ @@ -271,12 +279,13 @@ def interfaces_type(self): results = [] starting = '1.3.6.1.2.1.2.2.1.2.' types_oid = '1.3.6.1.2.1.2.2.1.3.' - for i in self._value_to_retrieve(): + for i in self._value_to_retrieve(snmpdump=snmpdump): result = self._dict( { - 'name': self.get_value(starting + str(i)), + 'name': self.get_value(starting + str(i), snmpdump=snmpdump), 'type': ifTypes.get( - self.get_value(types_oid + str(i)), 'unknown' + self.get_value(types_oid + str(i), snmpdump=snmpdump), + 'unknown', ), } ) @@ -287,20 +296,23 @@ def interfaces_type(self): _interface_addr_and_mask = None - @property - def interface_addr_and_mask(self): + def interface_addr_and_mask(self, snmpdump=None): """ TODO: this method needs to be simplified and explained """ if self._interface_addr_and_mask is None: - interface_name = self.get_interfaces() + interface_name = self.get_interfaces(snmpdump=snmpdump) for i in range(0, len(interface_name)): - self._interface_dict[self._value_to_retrieve()[i]] = interface_name[i] + self._interface_dict[ + self._value_to_retrieve(snmpdump=snmpdump)[i] + ] = interface_name[i] - interface_ip_address = self.next('1.3.6.1.2.1.4.20.1.1')[3] - interface_index = self.next('1.3.6.1.2.1.4.20.1.2')[3] - interface_netmask = self.next('1.3.6.1.2.1.4.20.1.3')[3] + interface_ip_address = self.next( + '1.3.6.1.2.1.4.20.1.1.', snmpdump=snmpdump + )[3] + interface_index = self.next('1.3.6.1.2.1.4.20.1.2.', snmpdump=snmpdump)[3] + interface_netmask = self.next('1.3.6.1.2.1.4.20.1.3.', snmpdump=snmpdump)[3] results = {} @@ -318,30 +330,29 @@ def interface_addr_and_mask(self): return self._interface_addr_and_mask - @property - def interfaces_to_dict(self): + def interfaces_to_dict(self, snmpdump=None): """ Returns an ordered dict with all the information available about the interface """ results = [] - for i in range(0, len(self.get_interfaces())): + for i in range(0, len(self.get_interfaces(snmpdump=snmpdump))): logger.info(f'====== {i} ======') logger.info('... name ...') - name = self.interfaces_MAC[i]['name'] + name = self.interfaces_MAC(snmpdump=snmpdump)[i]['name'] logger.info('... if_type ...') - if_type = self.interfaces_type[i]['type'] + if_type = self.interfaces_type(snmpdump=snmpdump)[i]['type'] logger.info('... mac_address ...') - mac_address = self.interfaces_MAC[i]['mac_address'] + mac_address = self.interfaces_MAC(snmpdump=snmpdump)[i]['mac_address'] logger.info('... rx_bytes ...') - rx_bytes = int(self.interfaces_bytes[i]['rx']) + rx_bytes = int(self.interfaces_bytes(snmpdump=snmpdump)[i]['rx']) logger.info('... tx_bytes ...') - tx_bytes = int(self.interfaces_bytes[i]['tx']) + tx_bytes = int(self.interfaces_bytes(snmpdump=snmpdump)[i]['tx']) logger.info('... up ...') - up = self.interfaces_up[i]['up'] + up = self.interfaces_up(snmpdump=snmpdump)[i]['up'] logger.info('... mtu ...') - mtu = int(self.interfaces_mtu[i]['mtu']) + mtu = int(self.interfaces_mtu(snmpdump=snmpdump)[i]['mtu']) result = self._dict( { @@ -359,12 +370,11 @@ def interfaces_to_dict(self): results.append(result) return results - @property - def local_time(self): + def local_time(self, snmpdump=None): """ returns the local time of the host device as a timestamp """ - octetstr = bytes(self.get('1.3.6.1.2.1.25.1.2.0')[3][0][1]) + octetstr = bytes(self.get('1.3.6.1.2.1.25.1.2.0', snmpdump=snmpdump)[3][0][1]) size = len(octetstr) # string may or may not contain timezone, so size can be 8 or 11 if size == 8: @@ -412,86 +422,90 @@ def local_time(self): ).timestamp() ) - @property - def RAM_total(self): + def RAM_total(self, snmpdump=None): """ returns the total RAM of the device """ - return int(self.get_value('1.3.6.1.2.1.25.2.3.1.5.1')) + return int(self.get_value('1.3.6.1.2.1.25.2.3.1.5.1', snmpdump=snmpdump)) - @property - def RAM_shared(self): + def RAM_shared(self, snmpdump=None): """ returns the shared RAM of the device """ - return int(self.get_value('1.3.6.1.2.1.25.2.3.1.6.8')) + return int(self.get_value('1.3.6.1.2.1.25.2.3.1.6.8', snmpdump=snmpdump)) - @property - def RAM_cached(self): + def RAM_cached(self, snmpdump=None): """ returns the cached RAM of the device """ - return int(self.get_value('1.3.6.1.2.1.25.2.3.1.5.7')) + return int(self.get_value('1.3.6.1.2.1.25.2.3.1.5.7', snmpdump=snmpdump)) - @property - def RAM_used(self): + def RAM_used(self, snmpdump=None): """ returns the used RAM of the device """ - return int(self.get_value('1.3.6.1.2.1.25.2.3.1.6.1')) + return int(self.get_value('1.3.6.1.2.1.25.2.3.1.6.1', snmpdump=snmpdump)) - @property - def RAM_free(self): + def RAM_free(self, snmpdump=None): """ returns the free RAM of the device """ - return int(self.RAM_total - (self.RAM_used - self.RAM_cached)) + return int( + self.RAM_total(snmpdump=snmpdump) + - (self.RAM_used(snmpdump=snmpdump) - self.RAM_cached(snmpdump=snmpdump)) + ) - @property - def SWAP_total(self): + def SWAP_total(self, snmpdump=None): """ returns the total SWAP of the device """ - return int(self.get_value('1.3.6.1.2.1.25.2.3.1.5.10')) + return int(self.get_value('1.3.6.1.2.1.25.2.3.1.5.10', snmpdump=snmpdump)) - @property - def SWAP_free(self): + def SWAP_used(self, snmpdump=None): + """ + returns the used SWAP of the device + """ + return int(self.get_value('1.3.6.1.2.1.25.2.3.1.6.10', snmpdump=snmpdump)) + + def SWAP_free(self, snmpdump=None): """ returns the free SWAP of the device """ - SWAP_used = int(self.get_value('1.3.6.1.2.1.25.2.3.1.6.10')) - SWAP_free = self.SWAP_total - SWAP_used + SWAP_free = self.SWAP_total(snmpdump=snmpdump) - self.SWAP_used( + snmpdump=snmpdump + ) return SWAP_free - @property - def CPU_count(self): + def CPU_count(self, snmpdump=None): """ returns the count of CPUs of the device """ - return len(self.next('1.3.6.1.2.1.25.3.3.1.2')[3]) + return len(self.next('1.3.6.1.2.1.25.3.3.1.2.', snmpdump=snmpdump)[3]) - @property - def resources_to_dict(self): + def resources_to_dict(self, snmpdump=None): """ returns an ordered dict with hardware resources information """ result = self._dict( { - 'cpus': self.CPU_count, + 'cpus': self.CPU_count(snmpdump=snmpdump), 'memory': { - 'total': self.RAM_total, - 'shared': self.RAM_shared, - 'used': self.RAM_used, - 'free': self.RAM_free, - 'cached': self.RAM_cached, + 'total': self.RAM_total(snmpdump=snmpdump), + 'shared': self.RAM_shared(snmpdump=snmpdump), + 'used': self.RAM_used(snmpdump=snmpdump), + 'free': self.RAM_free(snmpdump=snmpdump), + 'cached': self.RAM_cached(snmpdump=snmpdump), + }, + 'swap': { + 'total': self.SWAP_total(snmpdump=snmpdump), + 'free': self.SWAP_free(snmpdump=snmpdump), + 'used': self.SWAP_used(snmpdump=snmpdump), }, - 'swap': {'total': self.SWAP_total, 'free': self.SWAP_free}, } ) return result - @property - def neighbors(self): + def neighbors(self, snmpdump=None): """ returns a dict with neighbors information """ @@ -507,7 +521,7 @@ def neighbors(self): neighbors_oid = '1.3.6.1.2.1.4.35.1.4' neighbor_states_oid = '1.3.6.1.2.1.4.35.1.7' - neighbor_info = self.next('1.3.6.1.2.1.4.35.1')[3] + neighbor_info = self.next('1.3.6.1.2.1.4.35.1.', snmpdump=snmpdump)[3] neighbors = [] neighbor_states = [] result = [] @@ -529,7 +543,9 @@ def neighbors(self): int(neighbor[0][1].prettyPrint(), 16), dialect=mac_unix_expanded ) interface_num = neighbor[0][0].getOid()[10] - interface = self.get(f'1.3.6.1.2.1.31.1.1.1.1.{interface_num}')[3][0][1] + interface = self.get( + f'1.3.6.1.2.1.31.1.1.1.1.{interface_num}', snmpdump=snmpdump + )[3][0][1] state = states_map[str(neighbor_states[index][0][1])] except (IndexError, TypeError, ValueError): continue @@ -545,13 +561,19 @@ def neighbors(self): ) return result - def to_dict(self): - return self._dict( + def to_dict(self, snmpdump=None, autowalk=True): + if autowalk: + snmpdump = Trie(self.walk('1.3.6')) + result = self._dict( { 'type': 'DeviceMonitoring', - 'general': {'uptime': self.uptime, "local_time": self.local_time}, - 'resources': self.resources_to_dict, - 'interfaces': self.interfaces_to_dict, - 'neighbors': self.neighbors, + 'general': { + 'uptime': self.uptime(snmpdump=snmpdump), + "local_time": self.local_time(snmpdump=snmpdump), + }, + 'resources': self.resources_to_dict(snmpdump=snmpdump), + 'interfaces': self.interfaces_to_dict(snmpdump=snmpdump), + 'neighbors': self.neighbors(snmpdump=snmpdump), } ) + return result diff --git a/requirements.txt b/requirements.txt index 81d4504..b709a55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ netaddr~=0.8.0 pysnmp~=4.4.12 +pytrie~=0.4.0 diff --git a/tests/test_dummy.py b/tests/test_dummy.py index 7c5120c..54db4d6 100644 --- a/tests/test_dummy.py +++ b/tests/test_dummy.py @@ -24,6 +24,6 @@ def test_get_manufacturer(self): self.assertIn('Xensource, Inc.', str(self.dummy.get_manufacturer(dummy_addr))) def test_to_json(self): - json_string = self.dummy.to_json() + json_string = self.dummy.to_json(autowalk=False) self.assertTrue(isinstance(json_string, str)) json.loads(json_string) diff --git a/tests/test_snmp/test_airos.py b/tests/test_snmp/test_airos.py index 08e7b58..bfbe3e9 100644 --- a/tests/test_snmp/test_airos.py +++ b/tests/test_snmp/test_airos.py @@ -50,7 +50,8 @@ def test_get_value_error(self): def test_validate_negative_result(self): self.getcmd_patcher.stop() wrong = AirOS('10.40.0.254', 'wrong', 'wrong') - self.assertRaises(NetEngineError, wrong.validate) + with self.assertRaises(NetEngineError): + wrong.validate() def test_validate_positive_result(self): self.device.validate() @@ -68,95 +69,95 @@ def test_get(self): def test_properties(self): device = self.device - device.os - device.name - device.model - device.os - device.uptime + device.os() + device.name() + device.model() + device.os() + device.uptime() def test_name(self): - self.assertIsInstance(self.device.name, str) + self.assertIsInstance(self.device.name(), str) def test_os(self): - self.assertIsInstance(self.device.os, tuple) + self.assertIsInstance(self.device.os(), tuple) def test_get_interfaces(self): self.assertIsInstance(self.device.get_interfaces(), list) def test_get_interfaces_mtu(self): - self.assertIsInstance(self.device.interfaces_mtu, list) + self.assertIsInstance(self.device.interfaces_mtu(), list) def test_interfaces_state(self): - self.assertIsInstance(self.device.interfaces_state, list) + self.assertIsInstance(self.device.interfaces_state(), list) def test_interfaces_speed(self): - self.assertIsInstance(self.device.interfaces_speed, list) + self.assertIsInstance(self.device.interfaces_speed(), list) def test_interfaces_bytes(self): - self.assertIsInstance(self.device.interfaces_bytes, list) + self.assertIsInstance(self.device.interfaces_bytes(), list) def test_interfaces_MAC(self): - self.assertIsInstance(self.device.interfaces_MAC, list) + self.assertIsInstance(self.device.interfaces_MAC(), list) def test_interfaces_type(self): - self.assertIsInstance(self.device.interfaces_type, list) + self.assertIsInstance(self.device.interfaces_type(), list) def test_interfaces_to_dict(self): - self.assertIsInstance(self.device.interfaces_to_dict, list) + self.assertIsInstance(self.device.interfaces_to_dict(), list) def test_wireless_dbm(self): - self.assertIsInstance(self.device.wireless_dbm, list) + self.assertIsInstance(self.device.wireless_dbm(), list) def test_interfaces_number(self): - self.assertIsInstance(self.device.interfaces_number, int) + self.assertIsInstance(self.device.interfaces_number(), int) def test_wireless_to_dict(self): - self.assertIsInstance(self.device.wireless_links, list) + self.assertIsInstance(self.device.wireless_links(), list) def test_RAM_free(self): - self.assertIsInstance(self.device.RAM_free, int) + self.assertIsInstance(self.device.RAM_free(), int) def test_RAM_total(self): - self.assertIsInstance(self.device.RAM_total, int) + self.assertIsInstance(self.device.RAM_total(), int) def test_to_dict(self): - self.assertTrue(isinstance(self.device.to_dict(), dict)) + self.assertTrue(isinstance(self.device.to_dict(autowalk=False), dict)) def test_netjson_compliance(self): - device_dict = self.device.to_dict() - device_json = self.device.to_json() + device_dict = self.device.to_dict(autowalk=False) + device_json = self.device.to_json(autowalk=False) validate(instance=device_dict, schema=schema) validate(instance=json.loads(device_json), schema=schema) def test_manufacturer(self): - self.assertIsNotNone(self.device.manufacturer) + self.assertIsNotNone(self.device.manufacturer()) def test_model(self): - self.assertIsInstance(self.device.model, str) + self.assertIsInstance(self.device.model(), str) def test_firmware(self): - self.assertIsInstance(self.device.firmware, str) + self.assertIsInstance(self.device.firmware(), str) def test_uptime(self): - self.assertIsInstance(self.device.uptime, int) + self.assertIsInstance(self.device.uptime(), int) def test_RAM_buffered(self): - self.assertIsInstance(self.device.RAM_buffered, int) + self.assertIsInstance(self.device.RAM_buffered(), int) def test_RAM_cached(self): - self.assertIsInstance(self.device.RAM_cached, int) + self.assertIsInstance(self.device.RAM_cached(), int) def test_SWAP_total(self): - self.assertIsInstance(self.device.SWAP_total, int) + self.assertIsInstance(self.device.SWAP_total(), int) def test_SWAP_free(self): - self.assertIsInstance(self.device.SWAP_free, int) + self.assertIsInstance(self.device.SWAP_free(), int) def test_local_time(self): - self.assertIsInstance(self.device.local_time, int) + self.assertIsInstance(self.device.local_time(), int) def test_load(self): - load = self.device.load + load = self.device.load() self.assertIsInstance(load, list) self.assertEqual(len(load), 3) self.assertIsInstance(load[0], int) diff --git a/tests/test_snmp/test_openwrt.py b/tests/test_snmp/test_openwrt.py index e81e2f8..c48c26e 100644 --- a/tests/test_snmp/test_openwrt.py +++ b/tests/test_snmp/test_openwrt.py @@ -41,87 +41,87 @@ def setUp(self): self.nextcmd_patcher.start() def test_os(self): - self.assertIsInstance(self.device.os, tuple) + self.assertIsInstance(self.device.os(), tuple) def test_manufacturer(self): - self.assertIsNotNone(self.device.manufacturer) + self.assertIsNotNone(self.device.manufacturer()) def test_name(self): - self.assertIsInstance(self.device.name, str) + self.assertIsInstance(self.device.name(), str) def test_uptime(self): - self.assertIsInstance(self.device.uptime, int) + self.assertIsInstance(self.device.uptime(), int) def test_uptime_tuple(self): - self.assertIsInstance(self.device.uptime_tuple, tuple) + self.assertIsInstance(self.device.uptime_tuple(), tuple) def test_get_interfaces(self): self.assertIsInstance(self.device.get_interfaces(), list) def test_interfaces_speed(self): - self.assertIsInstance(self.device.interfaces_speed, list) + self.assertIsInstance(self.device.interfaces_speed(), list) def test_interfaces_bytes(self): - self.assertIsInstance(self.device.interfaces_bytes, list) + self.assertIsInstance(self.device.interfaces_bytes(), list) def test_interfaces_MAC(self): - self.assertIsInstance(self.device.interfaces_MAC, list) + self.assertIsInstance(self.device.interfaces_MAC(), list) def test_interfaces_type(self): - self.assertIsInstance(self.device.interfaces_type, list) + self.assertIsInstance(self.device.interfaces_type(), list) def test_interfaces_mtu(self): - self.assertIsInstance(self.device.interfaces_mtu, list) + self.assertIsInstance(self.device.interfaces_mtu(), list) def test_interfaces_state(self): - self.assertIsInstance(self.device.interfaces_up, list) + self.assertIsInstance(self.device.interfaces_up(), list) def test_interfaces_to_dict(self): - self.assertIsInstance(self.device.interfaces_to_dict, list) + self.assertIsInstance(self.device.interfaces_to_dict(), list) def test_interface_addr_and_mask(self): - self.assertIsInstance(self.device.interface_addr_and_mask, dict) + self.assertIsInstance(self.device.interface_addr_and_mask(), dict) def test_RAM_total(self): - self.assertIsInstance(self.device.RAM_total, int) + self.assertIsInstance(self.device.RAM_total(), int) def test_RAM_shared(self): - self.assertIsInstance(self.device.RAM_shared, int) + self.assertIsInstance(self.device.RAM_shared(), int) def test_RAM_cached(self): - self.assertIsInstance(self.device.RAM_cached, int) + self.assertIsInstance(self.device.RAM_cached(), int) def test_RAM_used(self): - self.assertIsInstance(self.device.RAM_used, int) + self.assertIsInstance(self.device.RAM_used(), int) def test_RAM_free(self): - self.assertIsInstance(self.device.RAM_free, int) + self.assertIsInstance(self.device.RAM_free(), int) def test_SWAP_total(self): - self.assertIsInstance(self.device.SWAP_total, int) + self.assertIsInstance(self.device.SWAP_total(), int) def test_SWAP_free(self): - self.assertIsInstance(self.device.SWAP_free, int) + self.assertIsInstance(self.device.SWAP_free(), int) def test_CPU_count(self): - self.assertIsInstance(self.device.CPU_count, int) + self.assertIsInstance(self.device.CPU_count(), int) def test_neighbors(self): - self.assertIsInstance(self.device.neighbors, list) + self.assertIsInstance(self.device.neighbors(), list) def test_local_time(self): - self.assertIsInstance(self.device.local_time, int) + self.assertIsInstance(self.device.local_time(), int) def test_to_dict(self): - device_dict = self.device.to_dict() + device_dict = self.device.to_dict(autowalk=False) self.assertIsInstance(device_dict, dict) self.assertEqual( len(device_dict['interfaces']), len(self.device.get_interfaces()), ) def test_netjson_compliance(self): - device_dict = self.device.to_dict() - device_json = self.device.to_json() + device_dict = self.device.to_dict(autowalk=False) + device_json = self.device.to_json(autowalk=False) validate(instance=device_dict, schema=schema) validate(instance=json.loads(device_json), schema=schema) diff --git a/tests/utils.py b/tests/utils.py index 0e1e2bb..e0ce095 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -59,18 +59,18 @@ def _get_nextcmd_list(return_value): return [None, None, None, return_value] res = { - '1.3.6.1.4.1.14988.1.1.1.2.1': [[[0, 0], 0]] * 28, - '1.3.6.1.4.1.14988.1.1.1.2.1.3': [0, 0], - '1.3.6.1.4.1.14988.1.1.1.2.1.3.0': [], - '1.3.6.1.2.1.1.9.1.1': [[[0, 1]], [[0, 2]], [[0, 3]], [[0, 4]], [[0, 5]]], + '1.3.6.1.4.1.14988.1.1.1.2.1.': [[[0, 0], 0]] * 28, + '1.3.6.1.4.1.14988.1.1.1.2.1.3.': [0, 0], + '1.3.6.1.4.1.14988.1.1.1.2.1.3.0.': [], + '1.3.6.1.2.1.1.9.1.1.': [[[0, 1]], [[0, 2]], [[0, 3]], [[0, 4]], [[0, 5]]], '1.3.6.1.2.1.2.2.1.6.': [[[0, 1]], [[0, 2]], [[0, 3]], [[0, 4]], [[0, 5]]], - '1.3.6.1.2.1.2.2.1.1': [[[0, 1]], [[0, 2]], [[0, 3]], [[0, 4]], [[0, 5]]], - '1.3.6.1.2.1.4.20.1.1': [[[0, OctetString('127.0.0.1')]]], - '1.3.6.1.2.1.4.20.1.2': [[[0, 1]]], - '1.3.6.1.2.1.25.3.3.1.2': [0, 2], - '1.3.6.1.2.1.4.20.1.3': [[[0, OctetString('192.168.0.1')]]], - '1.3.6.1.4.1.10002.1.1.1.4.2.1.3': [[[0, 51]], [[0, 18]], [[0, 24]]], - '1.3.6.1.2.1.4.35.1': [ + '1.3.6.1.2.1.2.2.1.1.': [[[0, 1]], [[0, 2]], [[0, 3]], [[0, 4]], [[0, 5]]], + '1.3.6.1.2.1.4.20.1.1.': [[[0, OctetString('127.0.0.1')]]], + '1.3.6.1.2.1.4.20.1.2.': [[[0, 1]]], + '1.3.6.1.2.1.25.3.3.1.2.': [0, 2], + '1.3.6.1.2.1.4.20.1.3.': [[[0, OctetString('192.168.0.1')]]], + '1.3.6.1.4.1.10002.1.1.1.4.2.1.3.': [[[0, 51]], [[0, 18]], [[0, 24]]], + '1.3.6.1.2.1.4.35.1.': [ [[[MockOid('1.3.6.1.2.1.4.35.1.4')]], OctetString('0x040e3cca555f')], [[[MockOid('1.3.6.1.2.1.4.35.1.7')]], 1], ],