From 4a1756b24df438796ef4c38ff2230dcdf2786aa9 Mon Sep 17 00:00:00 2001 From: Florian Paul Azim Hoberg Date: Wed, 7 Aug 2024 16:12:36 +0200 Subject: [PATCH] feature: Add rolling updates (node auto patching) feature Fixes: #39 --- proxlb | 282 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 246 insertions(+), 36 deletions(-) diff --git a/proxlb b/proxlb index fb92547..c96628d 100755 --- a/proxlb +++ b/proxlb @@ -22,28 +22,56 @@ import argparse import configparser +try: + import fastapi + _imports = True + _imports_missing = '' +except ImportError: + _imports = False + _imports_missing = ' fastapi' import json import logging import os try: import proxmoxer - _imports = True except ImportError: _imports = False + _imports_missing = _imports_missing + ' proxmoxer' import random import re import requests import socket import sys +import threading import time import urllib3 +try: + import uvicorn +except ImportError: + _imports = False + _imports_missing = _imports_missing + ' uvicorn' +from multiprocessing import Process # Constants __appname__ = "ProxLB" __version__ = "1.1.0b" __author__ = "Florian Paul Azim Hoberg @gyptazy" -__errors__ = False +__errors__ = False + + +# ProxLB API +proxlb_api = fastapi.FastAPI() +proxlb_api.update_self_status = 0 + +@proxlb_api.get("/updates/self/status") +async def update_status(): + return proxlb_api.update_self_status + +@proxlb_api.get("/updates/self/run") +async def update_run(): + proxlb_api.update_self_status = 1 + return proxlb_api.update_self_status # Classes @@ -96,7 +124,7 @@ def pre_validations(config_path): logging.info(f'{info_prefix} All pre-validations done.') -def post_validations(): +def post_validations(api_object, node_requires_reboot): """ Run post-validations as sanity checks. """ error_prefix = 'Error: [post-validations]:' info_prefix = 'Info: [post-validations]:' @@ -105,7 +133,43 @@ def post_validations(): logging.critical(f'{error_prefix} Not all post-validations succeeded. Please validate!') else: logging.info(f'{info_prefix} All post-validations succeeded.') + # Reboot node if necessary and all validations were performed. + run_node_reboot(api_object, node_requires_reboot) + + +def interact_proxlb_api(host, proxlb_api_port, method, uri, data=''): + """ Interact with the ProxLB API by the given data. """ + error_prefix = 'Error: [interact-proxlb-api]:' + info_prefix = 'Info: [interact-proxlb-api]:' + + proxlb_api_url = f'http://{host}:{proxlb_api_port}/{uri}' + + if method == 'get': + proxlb_api_response = requests.get(proxlb_api_url) + + if method == 'post': + proxlb_api_response = requests.get(proxlb_api_url, json=data) + + +## TODO +def validate_nodes_update_mode(api_object, proxlb_api_port): + """ Validate if other nodes within that cluster are already in update mode. """ + error_prefix = 'Error: [interact-proxlb-api-validate-update-node-status]:' + info_prefix = 'Info: [interact-proxlb-api-validate-update-node-status]:' + execute_reboot = 0 + for node in api_object.cluster().status().get(): + # Cluster is also listed as an object without any IP. Therefore, we need to validate the entries. + node_ip = node.get('ip', None) + if node_ip is not None: + try: + node_update_status = interact_proxlb_api(node_ip, proxlb_api_port, 'get', 'updates/self/status') + print(node_update_status) + except: + logging.critical(f'{error_prefix} Could not connect to ProxLB API on node: {node["name"]} (IP: {node_ip}).') + + + sys.exit(1) def validate_daemon(daemon, schedule): """ Validate if ProxLB runs as a daemon. """ @@ -125,7 +189,7 @@ def __validate_imports(): info_prefix = 'Info: [python-imports]:' if not _imports: - logging.critical(f'{error_prefix} Could not import all dependencies. Please install "proxmoxer".') + logging.critical(f'{error_prefix} Could not import all dependencies. Please install: {_imports_missing}.') sys.exit(2) else: logging.info(f'{info_prefix} All required dependencies were imported.') @@ -192,6 +256,10 @@ def initialize_config_options(config_path): daemon = config['service'].get('daemon', 1) schedule = config['service'].get('schedule', 24) log_verbosity = config['service'].get('log_verbosity', 'CRITICAL') + # API + proxlb_api_enable = config['api'].get('enable', 0) + proxlb_api_listener = config['api'].get('listener', '0.0.0.0') + proxlb_api_port = config['api'].get('port', 8008) except configparser.NoSectionError: logging.critical(f'{error_prefix} Could not find the required section.') sys.exit(2) @@ -204,7 +272,8 @@ def initialize_config_options(config_path): logging.info(f'{info_prefix} Configuration file loaded.') return proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v, balancing_method, balancing_mode, balancing_mode_option, \ - balancing_type, balanciness, parallel_migrations, ignore_nodes, ignore_vms, master_only, daemon, schedule, log_verbosity + balancing_type, balanciness, parallel_migrations, ignore_nodes, ignore_vms, master_only, daemon, schedule, log_verbosity, proxlb_api_enable, \ + proxlb_api_listener, proxlb_api_port def api_connect(proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v): @@ -234,6 +303,35 @@ def api_connect(proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_ap return api_object +def proxlb_api_server(host, port): + """ Start the ProxLB API server. """ + error_prefix = 'Error: [proxlb-api-server]:' + warn_prefix = 'Warning: [proxlb-api-server]:' + info_prefix = 'Info: [proxlb-api-server]:' + + logging.info(f'{info_prefix} Starting ProxLB API on listener {host} and port tcp/{port}') + uvicorn.run(proxlb_api, host=host, port=port) + + +def run_process_function(proxlb_api_enable, host, port, daemon=True): + """ Run a given function threaded. """ + error_prefix = 'Error: [proxlb-process-creator]:' + warn_prefix = 'Warning: [proxlb-process-creator]:' + info_prefix = 'Info: [proxlb-process-creator]:' + + if proxlb_api_enable: + proc = Process(target=proxlb_api_server, args=(host, port), daemon=daemon) + proc.start() + + # Watch the process status + if proc.is_alive(): + logging.info(f'{info_prefix} Process started. Process is a daemon: {daemon}') + else: + logging.critical(f'{error_prefix} Process {target_process} could not be started.') + else: + logging.info(f'{info_prefix} API function is disabled. Do not start ProxLB API.') + + def get_cluster_master(api_object): """ Get the current master of the Proxmox cluster. """ error_prefix = 'Error: [cluster-master-getter]:' @@ -270,6 +368,94 @@ def validate_cluster_master(cluster_master): return True +def get_node_update_status(api_object): + """ Get the current update status of the current executing host node in the cluster. """ + info_prefix = 'Info: [node-update-status-getter]:' + error_prefix = 'Error: [node-update-status-getter]:' + + node_executor_hostname = socket.gethostname() + logging.info(f'{info_prefix} Get update status for node: {node_executor_hostname}.') + + try: + update_status_object = api_object.nodes(node_executor_hostname).apt().update.get() + except proxmoxer.core.ResourceException: + logging.critical(f'{info_prefix} Unknown node in cluster: {node_executor_hostname}.') + sys.exit(2) + + if len(update_status_object) > 0: + logging.info(f'{info_prefix} Updates available for node: {node_executor_hostname}.') + return True + else: + logging.info(f'{info_prefix} No updates available for node: {node_executor_hostname}.') + return False + + +def run_node_update(api_object, node_requires_updates): + """ Run the update execution on node. """ + info_prefix = 'Info: [node-update-executor]:' + error_prefix = 'Error: [node-update-executor]:' + + node_executor_hostname = socket.gethostname() + + if node_requires_updates: + logging.info(f'{info_prefix} Execute updates on node: {node_executor_hostname}.') + try: + update_status_object = api_object.nodes(node_executor_hostname).status().post(command='upgrade') + except proxmoxer.core.ResourceException: + logging.critical(f'{error_prefix} Missing API endpoint on node: {node_executor_hostname}. Please make sure to have the package proxlb-additions installed.') + sys.exit(2) + logging.info(f'{info_prefix} Sucessfully integrated updates to node: {node_executor_hostname}.') + + +def extend_ignore_node_list(ignore_nodes): + """ Extend the node ignore list by this node. """ + info_prefix = 'Info: [node-ignore-list-adder]:' + error_prefix = 'Error: [node-ignore-list-adder]:' + + node_executor_hostname = socket.gethostname() + logging.info(f'{info_prefix} Adding node {node_executor_hostname} to ignore list.') + ignore_nodes = ignore_nodes + f',{node_executor_hostname}' + logging.info(f'{info_prefix} Ignored nodes are now: {ignore_nodes}.') + + return ignore_nodes + + +def get_node_reboot_status(): + """ Get the current reboot status of the current executing host node in the cluster. """ + info_prefix = 'Info: [node-reboot-status-getter]:' + error_prefix = 'Error: [node-reboot-status-getter]:' + reboot_status_file = '/var/run/reboot-required' + + node_executor_hostname = socket.gethostname() + logging.info(f'{info_prefix} Get reboot status for node: {node_executor_hostname}.') + + reboot_status_object = os.path.exists(reboot_status_file) + + if reboot_status_object: + logging.info(f'{info_prefix} Reboot required for node: {node_executor_hostname}.') + return True + else: + logging.info(f'{info_prefix} No reboot required for node: {node_executor_hostname}.') + return False + + +def run_node_reboot(api_object, node_requires_reboot): + """ Run the update execution on node. """ + info_prefix = 'Info: [node-reboot-executor]:' + error_prefix = 'Error: [node-reboot-executor]:' + + node_executor_hostname = socket.gethostname() + + if node_requires_reboot: + logging.info(f'{info_prefix} Execute reboot on node: {node_executor_hostname}.') + try: + update_status_object = api_object.nodes(node_executor_hostname).status().post(command='reboot') + except proxmoxer.core.ResourceException: + logging.critical(f'{error_prefix} Missing API endpoint on node: {node_executor_hostname}. Please make sure to have the package proxlb-additions installed.') + sys.exit(2) + logging.info(f'{info_prefix} Rebooting node now: {node_executor_hostname}.') + + def get_node_statistics(api_object, ignore_nodes): """ Get statistics of cpu, memory and disk for each node in the cluster. """ info_prefix = 'Info: [node-statistics]:' @@ -404,27 +590,29 @@ def get_vm_statistics(api_object, ignore_vms, balancing_type): return vm_statistics -def update_node_statistics(node_statistics, vm_statistics): +def update_node_statistics(node_statistics, vm_statistics, ignore_nodes): """ Update node statistics by VMs statistics. """ - info_prefix = 'Info: [node-update-statistics]:' - warn_prefix = 'Warning: [node-update-statistics]:' + info_prefix = 'Info: [node-update-statistics]:' + warn_prefix = 'Warning: [node-update-statistics]:' + ignore_nodes_list = ignore_nodes.split(',') for vm, vm_value in vm_statistics.items(): - node_statistics[vm_value['node_parent']]['cpu_assigned'] = node_statistics[vm_value['node_parent']]['cpu_assigned'] + int(vm_value['cpu_total']) - node_statistics[vm_value['node_parent']]['cpu_assigned_percent'] = (node_statistics[vm_value['node_parent']]['cpu_assigned'] / node_statistics[vm_value['node_parent']]['cpu_total']) * 100 - node_statistics[vm_value['node_parent']]['memory_assigned'] = node_statistics[vm_value['node_parent']]['memory_assigned'] + int(vm_value['memory_total']) - node_statistics[vm_value['node_parent']]['memory_assigned_percent'] = (node_statistics[vm_value['node_parent']]['memory_assigned'] / node_statistics[vm_value['node_parent']]['memory_total']) * 100 - node_statistics[vm_value['node_parent']]['disk_assigned'] = node_statistics[vm_value['node_parent']]['disk_assigned'] + int(vm_value['disk_total']) - node_statistics[vm_value['node_parent']]['disk_assigned_percent'] = (node_statistics[vm_value['node_parent']]['disk_assigned'] / node_statistics[vm_value['node_parent']]['disk_total']) * 100 + if not vm_value['node_parent'] in ignore_nodes_list: + node_statistics[vm_value['node_parent']]['cpu_assigned'] = node_statistics[vm_value['node_parent']]['cpu_assigned'] + int(vm_value['cpu_total']) + node_statistics[vm_value['node_parent']]['cpu_assigned_percent'] = (node_statistics[vm_value['node_parent']]['cpu_assigned'] / node_statistics[vm_value['node_parent']]['cpu_total']) * 100 + node_statistics[vm_value['node_parent']]['memory_assigned'] = node_statistics[vm_value['node_parent']]['memory_assigned'] + int(vm_value['memory_total']) + node_statistics[vm_value['node_parent']]['memory_assigned_percent'] = (node_statistics[vm_value['node_parent']]['memory_assigned'] / node_statistics[vm_value['node_parent']]['memory_total']) * 100 + node_statistics[vm_value['node_parent']]['disk_assigned'] = node_statistics[vm_value['node_parent']]['disk_assigned'] + int(vm_value['disk_total']) + node_statistics[vm_value['node_parent']]['disk_assigned_percent'] = (node_statistics[vm_value['node_parent']]['disk_assigned'] / node_statistics[vm_value['node_parent']]['disk_total']) * 100 - if node_statistics[vm_value['node_parent']]['cpu_assigned_percent'] > 99: - logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for CPU by {int(node_statistics[vm_value["node_parent"]]["cpu_assigned_percent"])}%.') + if node_statistics[vm_value['node_parent']]['cpu_assigned_percent'] > 99: + logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for CPU by {int(node_statistics[vm_value["node_parent"]]["cpu_assigned_percent"])}%.') - if node_statistics[vm_value['node_parent']]['memory_assigned_percent'] > 99: - logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for memory by {int(node_statistics[vm_value["node_parent"]]["memory_assigned_percent"])}%.') + if node_statistics[vm_value['node_parent']]['memory_assigned_percent'] > 99: + logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for memory by {int(node_statistics[vm_value["node_parent"]]["memory_assigned_percent"])}%.') - if node_statistics[vm_value['node_parent']]['disk_assigned_percent'] > 99: - logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for disk by {int(node_statistics[vm_value["node_parent"]]["disk_assigned_percent"])}%.') + if node_statistics[vm_value['node_parent']]['disk_assigned_percent'] > 99: + logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for disk by {int(node_statistics[vm_value["node_parent"]]["disk_assigned_percent"])}%.') logging.info(f'{info_prefix} Updated node resource assignments by all VMs.') logging.debug('node_statistics') @@ -484,7 +672,7 @@ def __get_proxlb_groups(vm_tags): return group_include, group_exclude, vm_ignore -def balancing_calculations(balancing_method, balancing_mode, balancing_mode_option, node_statistics, vm_statistics, balanciness, rebalance, processed_vms): +def balancing_calculations(balancing_method, balancing_mode, balancing_mode_option, node_statistics, vm_statistics, balanciness, ignore_nodes, rebalance, processed_vms): """ Calculate re-balancing of VMs on present nodes across the cluster. """ info_prefix = 'Info: [rebalancing-calculator]:' @@ -501,14 +689,14 @@ def balancing_calculations(balancing_method, balancing_mode, balancing_mode_opti # Update resource statistics for VMs and nodes. node_statistics, vm_statistics = __update_resource_statistics(resources_vm_most_used, resources_node_most_free, - vm_statistics, node_statistics, balancing_method, balancing_mode) + vm_statistics, node_statistics, balancing_method, balancing_mode, ignore_nodes) # Start recursion until we do not have any needs to rebalance anymore. - balancing_calculations(balancing_method, balancing_mode, balancing_mode_option, node_statistics, vm_statistics, balanciness, rebalance, processed_vms) + balancing_calculations(balancing_method, balancing_mode, balancing_mode_option, node_statistics, vm_statistics, balanciness, ignore_nodes, rebalance, processed_vms) # Honour groupings for include and exclude groups for rebalancing VMs. - node_statistics, vm_statistics = __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode) - node_statistics, vm_statistics = __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode) + node_statistics, vm_statistics = __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode, ignore_nodes) + node_statistics, vm_statistics = __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode, ignore_nodes) # Remove VMs that are not being relocated. vms_to_remove = [vm_name for vm_name, vm_info in vm_statistics.items() if 'node_rebalance' in vm_info and vm_info['node_rebalance'] == vm_info.get('node_parent')] @@ -632,11 +820,12 @@ def __get_most_free_resources_node(balancing_method, balancing_mode, balancing_m return node -def __update_resource_statistics(resource_highest_used_resources_vm, resource_highest_free_resources_node, vm_statistics, node_statistics, balancing_method, balancing_mode): +def __update_resource_statistics(resource_highest_used_resources_vm, resource_highest_free_resources_node, vm_statistics, node_statistics, balancing_method, balancing_mode, ignore_nodes): """ Update VM and node resource statistics. """ info_prefix = 'Info: [rebalancing-resource-statistics-update]:' + ignore_nodes_list = ignore_nodes.split(',') - if resource_highest_used_resources_vm[1]['node_parent'] != resource_highest_free_resources_node[0]: + if resource_highest_used_resources_vm[1]['node_parent'] != resource_highest_free_resources_node[0] and resource_highest_used_resources_vm[1]['node_parent'] not in ignore_nodes_list: vm_name = resource_highest_used_resources_vm[0] vm_node_parent = resource_highest_used_resources_vm[1]['node_parent'] vm_node_rebalance = resource_highest_free_resources_node[0] @@ -668,7 +857,7 @@ def __update_resource_statistics(resource_highest_used_resources_vm, resource_hi return node_statistics, vm_statistics -def __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode): +def __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode, ignore_nodes): """ Get VMs tags for include groups. """ info_prefix = 'Info: [rebalancing-tags-group-include]:' tags_include_vms = {} @@ -697,13 +886,13 @@ def __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_metho vm_node_rebalance = vm_statistics[vm_name]['node_rebalance'] else: _mocked_vm_object = (vm_name, vm_statistics[vm_name]) - node_statistics, vm_statistics = __update_resource_statistics(_mocked_vm_object, [vm_node_rebalance], vm_statistics, node_statistics, balancing_method, balancing_mode) + node_statistics, vm_statistics = __update_resource_statistics(_mocked_vm_object, [vm_node_rebalance], vm_statistics, node_statistics, balancing_method, balancing_mode, ignore_nodes) processed_vm.append(vm_name) return node_statistics, vm_statistics -def __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode): +def __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode, ignore_nodes): """ Get VMs tags for exclude groups. """ info_prefix = 'Info: [rebalancing-tags-group-exclude]:' tags_exclude_vms = {} @@ -736,7 +925,7 @@ def __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_metho random_node = random.choice(list(node_statistics.keys())) else: _mocked_vm_object = (vm_name, vm_statistics[vm_name]) - node_statistics, vm_statistics = __update_resource_statistics(_mocked_vm_object, [random_node], vm_statistics, node_statistics, balancing_method, balancing_mode) + node_statistics, vm_statistics = __update_resource_statistics(_mocked_vm_object, [random_node], vm_statistics, node_statistics, balancing_method, balancing_mode, ignore_nodes) processed_vm.append(vm_name) return node_statistics, vm_statistics @@ -872,13 +1061,16 @@ def main(): # Parse global config. proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v, balancing_method, balancing_mode, balancing_mode_option, balancing_type, \ - balanciness, parallel_migrations, ignore_nodes, ignore_vms, master_only, daemon, schedule, log_verbosity = initialize_config_options(config_path) + balanciness, parallel_migrations, ignore_nodes, ignore_vms, master_only, daemon, schedule, log_verbosity, proxlb_api_enable, proxlb_api_listener, proxlb_api_port = initialize_config_options(config_path) # Overwrite logging handler with user defined log verbosity. initialize_logger(log_verbosity, update_log_verbosity=True) + # ProxLB API Server + run_process_function(proxlb_api_enable, proxlb_api_listener, proxlb_api_port) + while True: - # API Authentication. + # Proxmox API Authentication. api_object = api_connect(proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v) # Get master node of cluster and ensure that ProxLB is only performed on the @@ -891,20 +1083,38 @@ def main(): validate_daemon(daemon, schedule) continue + # Validate for node auto update in cluster for rolling updates. + # Note: This requires proxlb-additions with a patched Proxmox API! + rolling_updates = 1 + if bool(int(rolling_updates)) and bool(int(proxlb_api_enable)): + node_requires_updates = get_node_update_status(api_object) + run_node_update(api_object, node_requires_updates) + node_requires_reboot = get_node_reboot_status() + node_requires_reboot = True + + # Prepare node for reboot by ignoring this node from being a valid source + # for rebalancing, set node to active update mode and migrate workloads. + if node_requires_reboot: + ignore_nodes = extend_ignore_node_list(ignore_nodes) + interact_proxlb_api('127.0.0.1', proxlb_api_port, 'get', 'updates/self/run') + validate_nodes_update_mode(api_object, proxlb_api_port) + else: + node_requires_reboot = False + # Get metric & statistics for vms and nodes. node_statistics = get_node_statistics(api_object, ignore_nodes) vm_statistics = get_vm_statistics(api_object, ignore_vms, balancing_type) - node_statistics = update_node_statistics(node_statistics, vm_statistics) + node_statistics = update_node_statistics(node_statistics, vm_statistics, ignore_nodes) # Calculate rebalancing of vms. node_statistics_rebalanced, vm_statistics_rebalanced = balancing_calculations(balancing_method, balancing_mode, balancing_mode_option, - node_statistics, vm_statistics, balanciness, rebalance=False, processed_vms=[]) + node_statistics, vm_statistics, balanciness, ignore_nodes, rebalance=False, processed_vms=[]) # Rebalance vms to new nodes within the cluster. run_vm_rebalancing(api_object, vm_statistics_rebalanced, app_args, parallel_migrations) # Validate for any errors. - post_validations() + #post_validations(api_object, node_requires_reboot) # Validate daemon service. validate_daemon(daemon, schedule)