Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port virtualbox scripts to VBoxManage CLI #625

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added virtualbox/__pycache__/vboxcommon.cpython-311.pyc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file should not be commited.

Binary file not shown.
143 changes: 84 additions & 59 deletions virtualbox/vbox-adapter-check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,83 @@
import sys
import textwrap
import argparse
import virtualbox
from virtualbox.library import NetworkAttachmentType as NetType
import subprocess
import re
import time
import gi
gi.require_version('Notify', '0.7')
from gi.repository import Notify
from vboxcommon import *
Comment on lines 3 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend using isort for formatting consistency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and black for other formatting consistency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blockers, but might be nice

Copy link
Member

@Ana06 Ana06 Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea @williballenthin! We can address this after this PR has been merged in a different PR, added to #507 (comment). So that @stevemk14ebr does not need to add more things to this PR. πŸ˜‰


DYNAMIC_VM_NAME = '.dynamic'
DISABLED_ADAPTER_TYPE = NetType.host_only
ALLOWED_ADAPTER_TYPES = (NetType.host_only, NetType.internal, NetType.null)

ENABLED_STRS = ('Disabled','Enabled ')

def check_and_disable_internet_access(session, machine_name, max_adapters, skip_disabled, do_not_modify):
"""
Checks if a VM's network adapter is set to an internet-accessible mode
and disables it if necessary, showing a warning popup.

Args:
session: The session of the virtual machine to check.
"""
adapters_with_internet = []
for i in range(max_adapters):
adapter = session.machine.get_network_adapter(i)

if skip_disabled and not adapter.enabled:
continue

print(f"{machine_name} {i+1}: {ENABLED_STRS[adapter.enabled]} {adapter.attachment_type}")

if DYNAMIC_VM_NAME in machine_name and adapter.attachment_type not in ALLOWED_ADAPTER_TYPES:
adapters_with_internet.append(i)
if not do_not_modify:
# Disable the adapter
adapter.attachment_type = DISABLED_ADAPTER_TYPE

if adapters_with_internet:
adapters_str = ", ".join(str(i+1) for i in adapters_with_internet)
if do_not_modify:
message = f"{machine_name} may be connected to the internet on adapter(s): {adapters_str}. Please double check your VMs settings."
DISABLED_ADAPTER_TYPE = "hostonly"
ALLOWED_ADAPTER_TYPES = ("hostonly", "intnet", "none")

def get_vm_uuids(dynamic_only):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend type hints, at least for function signatures, as a form of documentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker, but nice to have

"""Gets the machine UUID(s) for a given VM name using 'VBoxManage list vms'."""
machine_guids = []
try:
vms_output = run_vboxmanage(["list", "vms"])
pattern = r'"(.*?)" \{(.*?)\}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when dealing with regular expressions (and command output), i'd recommend including a few example lines of the pattern text, so its easy for a human to follow along. otherwise, i have to guess what this pattern does, which can be hard or impossible for some regular expressions.

matches = re.findall(pattern, vms_output)
if matches:
for match in matches:
vm_name = match[0]
machine_guid = match[1]
if dynamic_only and DYNAMIC_VM_NAME in vm_name:
machine_guids.append((vm_name, machine_guid))
else:
machine_guids.append((vm_name, machine_guid))
Comment on lines +29 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont understand this logic, aren't the branches the same on both sides?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made to try to address my feedback from #625 (comment). We want to add all names if dynamic_only is not set, and only the VMs with .dynamic in the name if it is set. I think it should be:

Suggested change
if dynamic_only and DYNAMIC_VM_NAME in vm_name:
machine_guids.append((vm_name, machine_guid))
else:
machine_guids.append((vm_name, machine_guid))
if (not dynamic_only) or DYNAMIC_VM_NAME in vm_name:
machine_guids.append((vm_name, machine_guid))

except Exception as e:
raise Exception(f"Error finding machines UUIDs: {e}")
Comment on lines +33 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's a good idea to remove these exception handlers, since they're catching every exception and just re-rendering them into a string (lossily dropping the stack trace along the way).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. The new lines make it also difficult to read:

VM {40138663-f254-412b-8776-10a7cc08daea} is not running (state: poweroff). Starting VM...
Error running VBoxManage command: Command '['VBoxManage', 'startvm', '{40138663-f254-412b-8776-10a7cc08daea}', '--type', 'gui']' returned non-zero exit status 1. (VBoxManage: error: Nonexistent host networking interface, name '' (VERR_INTERNAL_ERROR)
VBoxManage: error: Details: code NS_ERROR_FAILURE (0x80004005), component ConsoleWrap, interface IConsole
)

This may need some changes in the code (like having to return None in some functions), but I think it makes both the Python code and the exceptions nicer.

return machine_guids

def change_network_adapters_to_hostonly(machine_guid, vm_name, hostonly_ifname, do_not_modify):
"""Verify all adapters are in an allowed configuration. Must be poweredoff"""
Comment on lines +37 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this verify (read only validation) or change the VMs (modify the state)? i find the method name and docstring contradictory.

try:
# gather adapters in incorrect configurations
nics_with_internet = []
invalid_nics_msg = ''
vminfo = run_vboxmanage(["showvminfo", machine_guid, "--machinereadable"])
for nic_number, nic_value in re.findall("^nic(\d+)=\"(\S+)\"", vminfo, flags=re.M):
if nic_value not in ALLOWED_ADAPTER_TYPES:
nics_with_internet.append(f"nic{nic_number}")
stevemk14ebr marked this conversation as resolved.
Show resolved Hide resolved
invalid_nics_msg += f'{nic_number} '

# modify the invalid adapters if allowed
if nics_with_internet:
for nic in nics_with_internet:
stevemk14ebr marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its ok to iterate directly over the empty list, like:

for nic in nics_with_internet:
    ...

which saves a level of indentation.

if do_not_modify:
message = f"{vm_name} may be connected to the internet on adapter(s): {nic}. Please double check your VMs settings."
else:
message = f"{vm_name} may be connected to the internet on adapter(s): {nic}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings."
# different commands are necessary if the machine is running.
if get_vm_state(machine_guid) == "poweroff":
run_vboxmanage(["modifyvm", machine_guid, f"--{nic}", DISABLED_ADAPTER_TYPE])
else:
run_vboxmanage(["controlvm", machine_guid, nic, "hostonly", hostonly_ifname])
print(f"Set VM {vm_name} adaper {nic} to hostonly")

if do_not_modify:
message = f"{vm_name} may be connected to the internet on adapter(s): {invalid_nics_msg}. Please double check your VMs settings."
else:
message = f"{vm_name} may be connected to the internet on adapter(s): {invalid_nics_msg}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings."

# Show notification using PyGObject
Notify.init("VirtualBox adapter check")
notification = Notify.Notification.new(f"INTERNET IN VM: {vm_name}", message, "dialog-error")
# Set highest priority
notification.set_urgency(2)
notification.show()
Comment on lines +68 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good candidate for a helper function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the message should be written to stdout as well, for debugging/cli use?

print(f"{vm_name} network configuration not ok, sent notifaction")
return
else:
message = f"{machine_name} may be connected to the internet on adapter(s): {adapters_str}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings."

# Show notification using PyGObject
Notify.init("VirtualBox adapter check")
notification = Notify.Notification.new(f"INTERNET IN VM: {machine_name}", message, "dialog-error")
# Set highest priority
notification.set_urgency(2)
notification.show()

session.machine.save_settings()
session.unlock_machine()
print(f"{vm_name} network configuration is ok")
return

except Exception as e:
print(f"Error changing network adapters: {e}")
raise Exception("Failed to verify VM adapter configuration")

def main(argv=None):
if argv is None:
Expand All @@ -66,12 +93,6 @@ def main(argv=None):

# Print status of all internet adapters without modifying any of them
vbox-adapter-check.vm --do_not_modify

# Print status of enabled internet adapters and disabled the enabled adapters with internet access in VMs with {DYNAMIC_VM_NAME} in the name
vbox-adapter-check.vm --skip_disabled

# # Print status of enabled internet adapters without modifying any of them
vbox-adapter-check.vm --skip_disabled --do_not_modify
"""
)
parser = argparse.ArgumentParser(
Expand All @@ -80,15 +101,19 @@ def main(argv=None):
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--do_not_modify", action="store_true", help="Only print the status of the internet adapters without modifying them.")
parser.add_argument("--skip_disabled", action="store_true", help="Skip the disabled adapters.")
parser.add_argument("--dynamic_only", action="store_true", help="Only scan VMs with .dynamic in the name")
args = parser.parse_args(args=argv)

vbox = virtualbox.VirtualBox()
for machine in vbox.machines:
session = machine.create_session()
max_adapters = vbox.system_properties.get_max_network_adapters(machine.chipset_type)
check_and_disable_internet_access(session, machine.name, max_adapters, args.skip_disabled, args.do_not_modify)

try:
hostonly_ifname = ensure_hostonlyif_exists()
machine_guids = get_vm_uuids(args.dynamic_only)
if len(machine_guids) > 0:
for vm_name, machine_guid in machine_guids:
change_network_adapters_to_hostonly(machine_guid, vm_name, hostonly_ifname, args.do_not_modify)
else:
print(f"[Warning ⚠️] No VMs found")
except Exception as e:
print(f"Error verifying dynamic VM hostonly configuration: {e}")

if __name__ == "__main__":
main()
main()
98 changes: 63 additions & 35 deletions virtualbox/vbox-clean-snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,77 @@
import sys
import argparse
import textwrap
import virtualbox
from virtualbox.library import MachineState


TO_DELETE = []


def get_snapshots_to_delete(snapshot, protected_snapshots):
for child in snapshot.children:
get_snapshots_to_delete(child, protected_snapshots)
snapshot_name = snapshot.name.lower()
for protected_str in protected_snapshots:
if protected_str.lower() in snapshot_name:
return
TO_DELETE.append((snapshot.name, snapshot.id_p))

import subprocess
import re
from vboxcommon import *

def get_snapshot_children(vm_name, root_snapshot_name, protected_snapshots):
"""Gets the children of a snapshot using 'VBoxManage showvminfo'.

Args:
vm_name: The name of the VM.
snapshot_name: The name of the snapshot.

Returns:
A list of snapshot names that are children of the given snapshot.
Comment on lines +13 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice documentation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this return all descendents recursively? or only the direct children?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should return all. I think it would be good to clarify it in the documentation. πŸ‘

"""
try:
vminfo = run_vboxmanage(["showvminfo", vm_name, "--machinereadable"])
# Find all snapshot names
snapshot_regex = rf'(SnapshotName(?:-\d+)*)=\"(.*?)\"'
snapshots = re.findall(snapshot_regex, vminfo, flags=re.M)

children = []

# find the root SnapshotName by matching the name
root_snapshotid = None
for snapshotid, snapshot_name in snapshots:
if snapshot_name.lower() == root_snapshot_name.lower() and (not any(p.lower() in snapshot_name.lower() for p in protected_snapshots)):
root_snapshotid = snapshotid

if not root_snapshotid:
print("Failed to find root snapshot")
raise Exception(f"Failed to find root snapshot {snapshot_name}")

# children of that snapshot share the same prefix id
dependant_child = False
for snapshotid, snapshot_name in snapshots:
if snapshotid.startswith(root_snapshotid):
if not any(p.lower() in snapshot_name.lower() for p in protected_snapshots):
children.append((snapshotid, snapshot_name))
else:
dependant_child = True

# remove the root snapshot if any children are protected OR it's the current snapshot
if dependant_child:
print("Root snapshot cannot be deleted as a child snapshot is protected")
children = [snapshot for snapshot in children if snapshot[0] != root_snapshotid]
Comment on lines +23 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i can follow this logic, and it seems reasonable, but without some example output, i really have any idea.

furthermore, i wouldn't be comfortable changing this logic without any tests. how would i know if i broke anything (especially without vboxmanage installed)?

i'd recommend making this parsing region its own function (input: str, output: list), and then adding some test cases with known data. the test cases will serve both as documentation (since readers can see some examples of real-world data) and to prevent regressions.

return children
except Exception as e:
print(f"Error getting snapshot children: {e}")
raise Exception(f"Could not get snapshot children for '{vm_name}'")

def delete_snapshot_and_children(vm_name, snapshot_name, protected_snapshots):
vbox = virtualbox.VirtualBox()
vm = vbox.find_machine(vm_name)
snapshot = vm.find_snapshot(snapshot_name)
get_snapshots_to_delete(snapshot, protected_snapshots)
TO_DELETE = get_snapshot_children(vm_name, snapshot_name, protected_snapshots)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider reserving upper snake case names for constants

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker


if TO_DELETE:
print(f"\nCleaning {vm_name} 🫧 Snapshots to delete:")
for name, _ in TO_DELETE:
print(f" {name}")
for snapshotid, snapshot_name in TO_DELETE:
print(f" {snapshot_name}")

if vm.state not in (MachineState.powered_off, MachineState.saved):
print(f"\nVM state: {vm.state}\n⚠️ Snapshot deleting is slower in a running VM and may fail in a changing state")
vm_state = get_vm_state(vm_name)
if vm_state not in ("poweroff", "saved"):
print(f"\nVM state: {vm_state}\n⚠️ Snapshot deleting is slower in a running VM and may fail in a changing state")

answer = input("\nConfirm deletion ('y'):")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is "y" the default, or is that what you should press to confirm?

from the code i see its what you should press, but i'm afraid that in this format the message suggests "y" is the default. maybe change to "press 'y'".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that the user has to press 'y', as this deletes the snapshots and there is no way to recover them. I agree it would be good to clarify it.

if answer.lower() == "y":
print("\nDeleting... (this may take some time, go for an 🍦!)")
session = vm.create_session()
for name, uuid in TO_DELETE:
for snapshotid, snapshot_name in TO_DELETE[::-1]: # delete in reverse order to avoid issues with child snapshots
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for snapshotid, snapshot_name in TO_DELETE[::-1]: # delete in reverse order to avoid issues with child snapshots
for snapshotid, snapshot_name in reversed(TO_DELETE): # delete in reverse order to avoid issues with child snapshots

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, interesting behavior. i think get_snapshot_children should document that the order of the result has a particular meaning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug in my original script, that @stevemk14ebr corrected here. I think it is a good idea to document this in the get_snapshot_children function. πŸ‘

try:
progress = session.machine.delete_snapshot(uuid)
progress.wait_for_completion(-1)
print(f" 🫧 DELETED '{name}'")
run_vboxmanage(["snapshot", vm_name, "delete", snapshot_name])
print(f" 🫧 DELETED '{snapshot_name}'")
except Exception as e:
print(f" ❌ ERROR '{name}': {e}")
session.unlock_machine()
print(f" ❌ ERROR '{snapshot_name}': {e}")
else:
print(f"\n{vm_name} is clean 🫧")

Expand All @@ -67,10 +95,10 @@ def main(argv=None):

# Delete the 'CLEAN with IDA 8.4' children snapshots recursively skipping the ones that include 'clean' or 'done' in the name (case insensitive) in the 'FLARE-VM.20240604' VM
# NOTE: the 'CLEAN with IDA 8.4' root snapshot is skipped in this case
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot 'CLEAN with IDA 8.4'
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot CLEAN with IDA 8.4

# Delete the 'Snapshot 3' snapshot and its children recursively skipping the ones that include 'clean' or 'done' in the name (case insensitive) in the 'FLARE-VM.20240604' VM
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot 'Snapshot 3'
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot Snapshot 3

# Delete all snapshots in the 'FLARE-VM.20240604' VM
vbox-clean-snapshots.py FLARE-VM.20240604 --protected_snapshots ""
Expand All @@ -84,7 +112,7 @@ def main(argv=None):
parser.add_argument("vm_name", help="Name of the VM to clean up")
parser.add_argument("--root_snapshot", default="", help="Snapshot to delete (and its children recursively). Leave empty to clean all snapshots in the VM.")
parser.add_argument(
"--protected_snapshots",
"--protected_snapshots",
default="clean,done",
type=lambda s: s.split(","),
help='Comma-separated list of strings. Snapshots with any of the strings included in the name (case insensitive) are not deleted. Default: "clean,done"',
Expand All @@ -95,4 +123,4 @@ def main(argv=None):


if __name__ == "__main__":
main()
main()
Loading