Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port virtualbox scripts to VBoxManage CLI #625

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 148 additions & 59 deletions virtualbox/vbox-adapter-check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,148 @@
import sys
import textwrap
import argparse
import virtualbox
from virtualbox.library import NetworkAttachmentType as NetType
import subprocess
import re
import time
import gi
gi.require_version('Notify', '0.7')
from gi.repository import Notify

DYNAMIC_VM_NAME = '.dynamic'
DISABLED_ADAPTER_TYPE = NetType.host_only
ALLOWED_ADAPTER_TYPES = (NetType.host_only, NetType.internal, NetType.null)

ENABLED_STRS = ('Disabled','Enabled ')

def check_and_disable_internet_access(session, machine_name, max_adapters, skip_disabled, do_not_modify):
"""
Checks if a VM's network adapter is set to an internet-accessible mode
and disables it if necessary, showing a warning popup.

Args:
session: The session of the virtual machine to check.
"""
adapters_with_internet = []
for i in range(max_adapters):
adapter = session.machine.get_network_adapter(i)

if skip_disabled and not adapter.enabled:
continue

print(f"{machine_name} {i+1}: {ENABLED_STRS[adapter.enabled]} {adapter.attachment_type}")

if DYNAMIC_VM_NAME in machine_name and adapter.attachment_type not in ALLOWED_ADAPTER_TYPES:
adapters_with_internet.append(i)
if not do_not_modify:
# Disable the adapter
adapter.attachment_type = DISABLED_ADAPTER_TYPE

if adapters_with_internet:
adapters_str = ", ".join(str(i+1) for i in adapters_with_internet)
if do_not_modify:
message = f"{machine_name} may be connected to the internet on adapter(s): {adapters_str}. Please double check your VMs settings."
DISABLED_ADAPTER_TYPE = "hostonly"
ALLOWED_ADAPTER_TYPES = ("hostonly", "intnet", "none")

# cmd is an array of string arguments to pass
def run_vboxmanage(cmd):
"""Runs a VBoxManage command and returns the output."""
try:
result = subprocess.run(["VBoxManage"] + cmd, capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
# exit code is an error
print(f"Error running VBoxManage command: {e} ({e.stderr})")
raise Exception(f"Error running VBoxManage command: {e}")

def get_vm_state(machine_guid):
"""Gets the VM state using 'VBoxManage showvminfo'."""
vminfo = run_vboxmanage(["showvminfo", machine_guid, "--machinereadable"])
for line in vminfo.splitlines():
if line.startswith("VMState"):
return line.split("=")[1].strip('"')
raise Exception(f"Could not start VM '{machine_guid}'")

def ensure_hostonlyif_exists():
"""Gets the name of, or creates a new hostonlyif"""
try:
# Find existing hostonlyif
hostonlyifs_output = run_vboxmanage(["list", "hostonlyifs"])
for line in hostonlyifs_output.splitlines():
if line.startswith("Name:"):
hostonlyif_name = line.split(":")[1].strip()
print(f"Found existing hostonlyif {hostonlyif_name}")
return hostonlyif_name

# No host-only interface found, create one
print("No host-only interface found. Creating one...")
run_vboxmanage(["hostonlyif", "create"]) # Create a host-only interface
hostonlyifs_output = run_vboxmanage(["list", "hostonlyifs"]) # Get the updated list
for line in hostonlyifs_output.splitlines():
if line.startswith("Name:"):
hostonlyif_name = line.split(":")[1].strip()
print(f"Created hostonlyif {hostonlyif_name}")
return hostonlyif_name
print("Failed to create new hostonlyif. Exiting...")
raise Exception("Failed to create new hostonlyif.")
except Exception as e:
print(f"Error getting host-only interface name: {e}")
raise Exception("Failed to verify host-only interface exists")

def ensure_vm_shutdown(machine_guid):
"""Checks if the VM is running and shuts it down if it is."""
try:
vm_state = get_vm_state(machine_guid)
if vm_state != "poweroff":
print(f"VM {machine_guid} is not powered off. Shutting down VM...")
run_vboxmanage(["controlvm", machine_guid, "poweroff"])

# Wait for VM to shut down (up to 1 minute)
timeout = 60 # seconds
check_interval = 5 # seconds
start_time = time.time()
while time.time() - start_time < timeout:
vm_state = get_vm_state(machine_guid)
if vm_state == "poweroff":
print(f"VM {machine_guid} is shut down (status: {vm_state}).")
time.sleep(5) # wait a bit to be careful and avoid any weird races
return
time.sleep(check_interval)
print("Timeout waiting for VM to shut down. Exiting...")
raise TimeoutError("VM did not shut down within the timeout period.")
else:
message = f"{machine_name} may be connected to the internet on adapter(s): {adapters_str}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings."

# Show notification using PyGObject
Notify.init("VirtualBox adapter check")
notification = Notify.Notification.new(f"INTERNET IN VM: {machine_name}", message, "dialog-error")
# Set highest priority
notification.set_urgency(2)
notification.show()

session.machine.save_settings()
session.unlock_machine()
print(f"VM {machine_guid} is already shut down (state: {vm_state}).")
return
except Exception as e:
print(f"Error checking VM state: {e}")
raise Exception(f"Could not ensure '{machine_guid}' shutdown")

def get_dynamic_vm_uuids():
"""Gets the machine UUID(s) for a given VM name using 'VBoxManage list vms'."""
dynamic_machine_guids = []
try:
vms_output = run_vboxmanage(["list", "vms"])
pattern = r'"(.*?)" \{(.*?)\}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when dealing with regular expressions (and command output), i'd recommend including a few example lines of the pattern text, so its easy for a human to follow along. otherwise, i have to guess what this pattern does, which can be hard or impossible for some regular expressions.

matches = re.findall(pattern, vms_output)
if matches:
for match in matches:
vm_name = match[0]
machine_guid = match[1]
if DYNAMIC_VM_NAME in vm_name:
dynamic_machine_guids.append((vm_name, machine_guid))
except Exception as e:
print(f"Error finding dynamic machines UUIDs: {e}")
raise Exception(f"Error finding dynamic machines UUIDs: {e}")
return dynamic_machine_guids

def change_network_adapters_to_hostonly(machine_guid, vm_name, hostonly_ifname, do_not_modify):
"""Verify all adapters are in an allowed configuration. Must be poweredoff"""
Comment on lines +37 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this verify (read only validation) or change the VMs (modify the state)? i find the method name and docstring contradictory.

try:
# gather adapters in incorrect configurations
nics_with_internet = []
vminfo = run_vboxmanage(["showvminfo", machine_guid, "--machinereadable"])
for nic_number, nic_value in re.findall("^nic(\d+)=\"(\S+)\"", vminfo, flags=re.M):
if nic_value not in ALLOWED_ADAPTER_TYPES:
nics_with_internet.append(f"nic{nic_number}")
stevemk14ebr marked this conversation as resolved.
Show resolved Hide resolved

# modify the invalid adapters if allowed
if nics_with_internet:
for nic in nics_with_internet:
stevemk14ebr marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its ok to iterate directly over the empty list, like:

for nic in nics_with_internet:
    ...

which saves a level of indentation.

if do_not_modify:
message = f"{vm_name} may be connected to the internet on adapter(s): {nic}. Please double check your VMs settings."
else:
message = f"{vm_name} may be connected to the internet on adapter(s): {nic}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings."
# different commands are necessary if the machine is running.
if get_vm_state(machine_guid) == "poweroff":
run_vboxmanage(["modifyvm", machine_guid, f"--{nic}", DISABLED_ADAPTER_TYPE])
print(f"Set VM {nic} to hostonly")
else:
run_vboxmanage(["controlvm", machine_guid, nic, "hostonly", hostonly_ifname])
print(f"Set VM {nic} to hostonly")
stevemk14ebr marked this conversation as resolved.
Show resolved Hide resolved

# Show notification using PyGObject
Notify.init("VirtualBox adapter check")
notification = Notify.Notification.new(f"INTERNET IN VM: {vm_name}", message, "dialog-error")
# Set highest priority
notification.set_urgency(2)
notification.show()
print(f"{vm_name} network configuration not ok, sent notifaction")
return
else:
print(f"{vm_name} network configuration is ok")
return

except Exception as e:
print(f"Error changing network adapters: {e}")
raise Exception("Failed to verify VM adapter configuration")

def main(argv=None):
if argv is None:
Expand All @@ -66,12 +158,6 @@ def main(argv=None):

# Print status of all internet adapters without modifying any of them
vbox-adapter-check.vm --do_not_modify

# Print status of enabled internet adapters and disabled the enabled adapters with internet access in VMs with {DYNAMIC_VM_NAME} in the name
vbox-adapter-check.vm --skip_disabled

# # Print status of enabled internet adapters without modifying any of them
vbox-adapter-check.vm --skip_disabled --do_not_modify
"""
)
parser = argparse.ArgumentParser(
Expand All @@ -80,15 +166,18 @@ def main(argv=None):
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--do_not_modify", action="store_true", help="Only print the status of the internet adapters without modifying them.")
parser.add_argument("--skip_disabled", action="store_true", help="Skip the disabled adapters.")
args = parser.parse_args(args=argv)

vbox = virtualbox.VirtualBox()
for machine in vbox.machines:
session = machine.create_session()
max_adapters = vbox.system_properties.get_max_network_adapters(machine.chipset_type)
check_and_disable_internet_access(session, machine.name, max_adapters, args.skip_disabled, args.do_not_modify)

try:
hostonly_ifname = ensure_hostonlyif_exists()
dynamic_machine_guids = get_dynamic_vm_uuids()
stevemk14ebr marked this conversation as resolved.
Show resolved Hide resolved
if len(dynamic_machine_guids) > 0:
for vm_name, machine_guid in dynamic_machine_guids:
change_network_adapters_to_hostonly(machine_guid, vm_name, hostonly_ifname, args.do_not_modify)
else:
print(f"[Warning ⚠️] No Dynamic VMs found")
except Exception as e:
print(f"Error verifying dynamic VM hostonly configuration: {e}")

if __name__ == "__main__":
main()
main()
115 changes: 80 additions & 35 deletions virtualbox/vbox-clean-snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,94 @@
import sys
import argparse
import textwrap
import virtualbox
from virtualbox.library import MachineState


TO_DELETE = []


def get_snapshots_to_delete(snapshot, protected_snapshots):
for child in snapshot.children:
get_snapshots_to_delete(child, protected_snapshots)
snapshot_name = snapshot.name.lower()
for protected_str in protected_snapshots:
if protected_str.lower() in snapshot_name:
return
TO_DELETE.append((snapshot.name, snapshot.id_p))

import subprocess
import re

def run_vboxmanage(cmd):
"""Runs a VBoxManage command and returns the output."""
try:
result = subprocess.run(["VBoxManage"] + cmd, capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
# exit code is an error
print(f"Error running VBoxManage command: {e} ({e.stderr})")
raise Exception(f"Error running VBoxManage command")

def get_vm_state(vm_name):
"""Gets the VM state using 'VBoxManage showvminfo'."""
vminfo = run_vboxmanage(["showvminfo", vm_name, "--machinereadable"])
for line in vminfo.splitlines():
if line.startswith("VMState"):
return line.split("=")[1].strip('"')
raise Exception(f"Could not get VM state for '{vm_name}'")

def get_snapshot_children(vm_name, root_snapshot_name, protected_snapshots):
"""Gets the children of a snapshot using 'VBoxManage showvminfo'.

Args:
vm_name: The name of the VM.
snapshot_name: The name of the snapshot.

Returns:
A list of snapshot names that are children of the given snapshot.
Comment on lines +13 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice documentation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this return all descendents recursively? or only the direct children?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should return all. I think it would be good to clarify it in the documentation. πŸ‘

"""
try:
vminfo = run_vboxmanage(["showvminfo", vm_name, "--machinereadable"])
# Find all snapshot names
snapshot_regex = rf'(SnapshotName(?:-\d+)*)=\"(.*?)\"'
snapshots = re.findall(snapshot_regex, vminfo, flags=re.M)

children = []

# find the root SnapshotName by matching the name
root_snapshotid = None
for snapshotid, snapshot_name in snapshots:
if snapshot_name.lower() == root_snapshot_name.lower() and (not any(p.lower() in snapshot_name.lower() for p in protected_snapshots)):
root_snapshotid = snapshotid

if not root_snapshotid:
print("Failed to find root snapshot")
raise Exception(f"Failed to find root snapshot {snapshot_name}")

# children of that snapshot share the same prefix id
dependant_child = False
for snapshotid, snapshot_name in snapshots:
if snapshotid.startswith(root_snapshotid):
if not any(p.lower() in snapshot_name.lower() for p in protected_snapshots):
children.append((snapshotid, snapshot_name))
else:
dependant_child = True

# remove the root snapshot if any children are protected OR it's the current snapshot
if dependant_child:
print("Root snapshot cannot be deleted as a child snapshot is protected")
children = [snapshot for snapshot in children if snapshot[0] != root_snapshotid]
Comment on lines +23 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i can follow this logic, and it seems reasonable, but without some example output, i really have any idea.

furthermore, i wouldn't be comfortable changing this logic without any tests. how would i know if i broke anything (especially without vboxmanage installed)?

i'd recommend making this parsing region its own function (input: str, output: list), and then adding some test cases with known data. the test cases will serve both as documentation (since readers can see some examples of real-world data) and to prevent regressions.

return children
except Exception as e:
print(f"Error getting snapshot children: {e}")
raise Exception(f"Could not get snapshot children for '{vm_name}'")

def delete_snapshot_and_children(vm_name, snapshot_name, protected_snapshots):
vbox = virtualbox.VirtualBox()
vm = vbox.find_machine(vm_name)
snapshot = vm.find_snapshot(snapshot_name)
get_snapshots_to_delete(snapshot, protected_snapshots)
TO_DELETE = get_snapshot_children(vm_name, snapshot_name, protected_snapshots)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider reserving upper snake case names for constants

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker


if TO_DELETE:
print(f"\nCleaning {vm_name} 🫧 Snapshots to delete:")
for name, _ in TO_DELETE:
print(f" {name}")
for snapshotid, snapshot_name in TO_DELETE:
print(f" {snapshot_name}")

if vm.state not in (MachineState.powered_off, MachineState.saved):
print(f"\nVM state: {vm.state}\n⚠️ Snapshot deleting is slower in a running VM and may fail in a changing state")
vm_state = get_vm_state(vm_name)
if vm_state not in ("poweroff", "saved"):
print(f"\nVM state: {vm_state}\n⚠️ Snapshot deleting is slower in a running VM and may fail in a changing state")

answer = input("\nConfirm deletion ('y'):")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is "y" the default, or is that what you should press to confirm?

from the code i see its what you should press, but i'm afraid that in this format the message suggests "y" is the default. maybe change to "press 'y'".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that the user has to press 'y', as this deletes the snapshots and there is no way to recover them. I agree it would be good to clarify it.

if answer.lower() == "y":
print("\nDeleting... (this may take some time, go for an 🍦!)")
session = vm.create_session()
for name, uuid in TO_DELETE:
for snapshotid, snapshot_name in TO_DELETE[::-1]: # delete in reverse order to avoid issues with child snapshots
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for snapshotid, snapshot_name in TO_DELETE[::-1]: # delete in reverse order to avoid issues with child snapshots
for snapshotid, snapshot_name in reversed(TO_DELETE): # delete in reverse order to avoid issues with child snapshots

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, interesting behavior. i think get_snapshot_children should document that the order of the result has a particular meaning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug in my original script, that @stevemk14ebr corrected here. I think it is a good idea to document this in the get_snapshot_children function. πŸ‘

try:
progress = session.machine.delete_snapshot(uuid)
progress.wait_for_completion(-1)
print(f" 🫧 DELETED '{name}'")
run_vboxmanage(["snapshot", vm_name, "delete", snapshot_name])
print(f" 🫧 DELETED '{snapshot_name}'")
except Exception as e:
print(f" ❌ ERROR '{name}': {e}")
session.unlock_machine()
print(f" ❌ ERROR '{snapshot_name}': {e}")
else:
print(f"\n{vm_name} is clean 🫧")

Expand All @@ -67,10 +112,10 @@ def main(argv=None):

# Delete the 'CLEAN with IDA 8.4' children snapshots recursively skipping the ones that include 'clean' or 'done' in the name (case insensitive) in the 'FLARE-VM.20240604' VM
# NOTE: the 'CLEAN with IDA 8.4' root snapshot is skipped in this case
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot 'CLEAN with IDA 8.4'
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot CLEAN with IDA 8.4

# Delete the 'Snapshot 3' snapshot and its children recursively skipping the ones that include 'clean' or 'done' in the name (case insensitive) in the 'FLARE-VM.20240604' VM
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot 'Snapshot 3'
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot Snapshot 3

# Delete all snapshots in the 'FLARE-VM.20240604' VM
vbox-clean-snapshots.py FLARE-VM.20240604 --protected_snapshots ""
Expand All @@ -84,7 +129,7 @@ def main(argv=None):
parser.add_argument("vm_name", help="Name of the VM to clean up")
parser.add_argument("--root_snapshot", default="", help="Snapshot to delete (and its children recursively). Leave empty to clean all snapshots in the VM.")
parser.add_argument(
"--protected_snapshots",
"--protected_snapshots",
default="clean,done",
type=lambda s: s.split(","),
help='Comma-separated list of strings. Snapshots with any of the strings included in the name (case insensitive) are not deleted. Default: "clean,done"',
Expand All @@ -95,4 +140,4 @@ def main(argv=None):


if __name__ == "__main__":
main()
main()
Loading