diff --git a/nebula/__init__.py b/nebula/__init__.py index d6c0ae3..65a89c2 100644 --- a/nebula/__init__.py +++ b/nebula/__init__.py @@ -11,7 +11,7 @@ from nebula.helper import helper from nebula.jtag import jtag from nebula.manager import manager -from nebula.netbox import netbox, NetboxDevice, NetboxDevices +from nebula.netbox import NetboxDevice, NetboxDevices, netbox from nebula.netconsole import netconsole from nebula.network import network from nebula.pdu import pdu diff --git a/nebula/downloader.py b/nebula/downloader.py index f448775..ff29c4e 100644 --- a/nebula/downloader.py +++ b/nebula/downloader.py @@ -1061,7 +1061,7 @@ def download(self, url, fname): sha256_hash.update(data) bar.update(size) hash = sha256_hash.hexdigest() - with open(os.path.join(os.path.dirname(fname),"hashes.txt"),"a") as h: + with open(os.path.join(os.path.dirname(fname), "hashes.txt"), "a") as h: h.write(f"{os.path.basename(fname)},{hash}\n") def check(self, fname, ref): @@ -1106,5 +1106,3 @@ def extract(self, inname, outname): size = file.write(result) bar.update(size) data = ifile.read(1024) - - diff --git a/nebula/manager.py b/nebula/manager.py index 146da39..0bc3781 100644 --- a/nebula/manager.py +++ b/nebula/manager.py @@ -930,16 +930,13 @@ def verify_checksum(self, folder): if not hash_file: log.warning("Hash file not found, will not proceed with verification") return - - with open(hash_file, 'r') as file: + + with open(hash_file, "r") as file: for line in file: fname, hash = line.strip().split(",") # exclude some files if fname in ["bootgen_sysfiles.tgz"]: continue self.net.verify_checksum( - file_path=os.path.join("/boot",fname), - reference=hash + file_path=os.path.join("/boot", fname), reference=hash ) - - diff --git a/nebula/netbox.py b/nebula/netbox.py index 2497b78..c2ace3e 100644 --- a/nebula/netbox.py +++ b/nebula/netbox.py @@ -95,24 +95,23 @@ def __init__( def interface(self): return self.nb - - def get_user_from_token(self,token=None): + + def get_user_from_token(self, token=None): if not token: token = self.netbox_api_token - users = [ _token.user for _token in self.nb.users.tokens.filter(key=token)] + users = [_token.user for _token in self.nb.users.tokens.filter(key=token)] if len(users) != 1: raise IndexError("Token not matched") return users[0] - def get_device(self, id=None, name=None, slug=None): kwargs = dict() if id: - kwargs.update({"id":id}) + kwargs.update({"id": id}) elif name: - kwargs.update({"name":name}) + kwargs.update({"name": name}) elif slug: - kwargs.update({"slug":slug}) + kwargs.update({"slug": slug}) return self.nb.dcim.devices.get(**kwargs) def get_devices(self, **filters): @@ -131,28 +130,23 @@ def update_device(self, id, fields): self.nb.dcim.devices.update([device]) def update_status(self, device_id, status): - statuses = [ - "offline", - "active", - "planned", - "staged", - "failed", - "inventory" - ] + statuses = ["offline", "active", "planned", "staged", "failed", "inventory"] if status not in statuses: raise ValueError(f"Status {status} is not supported") self.update_device(id=device_id, fields={"status": status}) - def log_journal(self, device_id, author_id, kind="info", comments="Automated journal entry"): - kinds = ["info","success", "warning", "danger "] - if not kind in kinds: + def log_journal( + self, device_id, author_id, kind="info", comments="Automated journal entry" + ): + kinds = ["info", "success", "warning", "danger "] + if kind not in kinds: raise ValueError(f"kind only accepts {kinds}") self.nb.extras.journal_entries.create( - assigned_object_type = "dcim.device", - assigned_object_id = device_id, - created_by = author_id, - kind = kind, - comments = comments, + assigned_object_type="dcim.device", + assigned_object_id=device_id, + created_by=author_id, + kind=kind, + comments=comments, ) def add_tag(self, device_id, tag): @@ -307,11 +301,11 @@ def __init__( # noqa: C901 ): self.data = dict() - if type(netbox_interface) == netbox: + if isinstance(netbox_interface, netbox): self.nbi = netbox_interface else: raise TypeError("Must be of type Netbox interface (netbox)") - + if not device_name: device_name = netbox_interface.board_name @@ -566,19 +560,20 @@ def to_config(self, template): # noqa: C901 return template_dict_list def enable(self, reason="nebula: Enable device"): - device_id =self.data["devices"]["id"] + device_id = self.data["devices"]["id"] author = self.nbi.get_user_from_token() self.nbi.add_tag(device_id=device_id, tag="active") self.nbi.update_status(device_id=device_id, status="active") self.nbi.log_journal(device_id=device_id, author_id=author.id, comments=reason) - def disable(self,reason="nebula: Disable device"): - device_id =self.data["devices"]["id"] + def disable(self, reason="nebula: Disable device"): + device_id = self.data["devices"]["id"] author = self.nbi.get_user_from_token() self.nbi.remove_tag(device_id=device_id, tag="active") self.nbi.update_status(device_id=device_id, status="offline") self.nbi.log_journal(device_id=device_id, author_id=author.id, comments=reason) + class NetboxDevices: """List of NetboxDevice""" diff --git a/nebula/network.py b/nebula/network.py index e0eeb18..36fe70c 100644 --- a/nebula/network.py +++ b/nebula/network.py @@ -407,12 +407,12 @@ def run_diagnostics(self): def verify_checksum(self, file_path, reference, algo="sha256"): if algo == "sha256": - ssh_command = "python -c \"import hashlib;" + ssh_command = 'python -c "import hashlib;' ssh_command += f" print(hashlib.sha256(open('{file_path}', 'rb').read()).hexdigest())\"" result = self.run_ssh_command( - command=ssh_command, - print_result_to_file=False, - show_log=False + command=ssh_command, print_result_to_file=False, show_log=False ) if not reference == result.stdout.strip(): - raise Exception(f"Checksum for {file_path} does not match {result.stdout.strip()}") + raise Exception( + f"Checksum for {file_path} does not match {result.stdout.strip()}" + ) diff --git a/nebula/tasks.py b/nebula/tasks.py index f4c8cb1..cc1c906 100644 --- a/nebula/tasks.py +++ b/nebula/tasks.py @@ -15,6 +15,7 @@ class MyFilter(logging.Filter): def filter(self, record): return "nebula" in record.name + LINUX_DEFAULT_PATH = "/etc/default/nebula" WINDOWS_DEFAULT_PATH = "C:\\nebula\\nebula.yaml" @@ -912,6 +913,7 @@ def update_boot_files_manager( else: m.board_reboot_auto_folder(folder=folder, sdcard=sdcard, design_name=board_name) + @task( help={ "folder": "Resource folder containing BOOT.BIN, kernel, device tree, and system_top.bit.\nOverrides other setting", @@ -929,6 +931,7 @@ def verify_checksum_manager( m = nebula.manager(configfilename=yamlfilename, board_name=board_name) m.verify_checksum(folder) + manager = Collection("manager") manager.add_task(update_boot_files_manager, name="update_boot_files") manager.add_task(update_boot_files_jtag_manager, name="update_boot_files_jtag") @@ -1423,7 +1426,7 @@ def check_board_booted( @task( help={ - "reason": "Reason why board wil be disabled", + "reason": "Reason why board will be disabled", "netbox_ip": "IP address of netbox instance", "netbox_port": "Network port of netbox instance", "netbox_token": "Netbox access token", @@ -1460,7 +1463,7 @@ def disable_board( @task( help={ - "reason": "Reason why board wil be enabled", + "reason": "Reason why board will be enabled", "netbox_ip": "IP address of netbox instance", "netbox_port": "Network port of netbox instance", "netbox_token": "Netbox access token", diff --git a/pyproject.toml b/pyproject.toml index 1828fc7..d38527e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ pysnmp = "^4.4.12" pyasn1 = "^0.4.8" xmodem = "^0.4.7" netifaces = "^0.10.9" -click = "^7.1.2" +click = "^8.0.0" usbsdmux = "^0.2.1" tqdm = "^4.62.3" beautifulsoup4 = "^4.12.3"