Skip to content

Commit

Permalink
fix: Fix anti-affinity rules not evaluating a new and different node …
Browse files Browse the repository at this point in the history
…correctly.

Fixes: #67
Fixes: #71
  • Loading branch information
gyptazy committed Sep 2, 2024
1 parent 7ddb7ca commit a8115e1
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .changelogs/1.0.3/67_fix_anti_affinity_rules.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
fixed:
- Fix anti-affinity rules not evaluating a new and different node. [#67]
2 changes: 2 additions & 0 deletions .changelogs/1.0.3/71_fix_ignore_vm_node_handling_if_unset.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
fixed:
- Fix handling of unset `ignore_nodes` and `ignore_vms` resulted in an attribute error. [#71]
123 changes: 88 additions & 35 deletions proxlb
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ def initialize_config_options(config_path):
proxlb_config['vm_balancing_type'] = config['vm_balancing'].get('type', 'vm')
proxlb_config['vm_balanciness'] = config['vm_balancing'].get('balanciness', 10)
proxlb_config['vm_parallel_migrations'] = config['vm_balancing'].get('parallel_migrations', 1)
proxlb_config['vm_ignore_nodes'] = config['vm_balancing'].get('ignore_nodes', None)
proxlb_config['vm_ignore_vms'] = config['vm_balancing'].get('ignore_vms', None)
proxlb_config['vm_ignore_nodes'] = config['vm_balancing'].get('ignore_nodes', '')
proxlb_config['vm_ignore_vms'] = config['vm_balancing'].get('ignore_vms', '')
proxlb_config['vm_enforce_affinity_groups'] = config['vm_balancing'].get('enforce_affinity_groups', 1)
# Storage Balancing
proxlb_config['storage_balancing_enable'] = config['storage_balancing'].get('enable', 0)
proxlb_config['storage_balancing_method'] = config['storage_balancing'].get('method', 'disk_space')
Expand Down Expand Up @@ -504,7 +505,7 @@ def get_vm_statistics(api_object, ignore_vms, balancing_type):
info_prefix = 'Info: [vm-statistics]:'
warn_prefix = 'Warn: [vm-statistics]:'
vm_statistics = {}
ignore_vms_list = ignore_vms.split(',')
ignore_vms_list = ignore_vms.split(',')
group_include = None
group_exclude = None
vm_ignore = None
Expand Down Expand Up @@ -751,7 +752,10 @@ def __get_vm_tags(api_object, node, vmid, balancing_type):
if balancing_type == 'ct':
vm_config = api_object.nodes(node['node']).lxc(vmid).config.get()

logging.info(f'{info_prefix} Got VM/CT tag from API.')
if vm_config.get("tags", None) is None:
logging.info(f'{info_prefix} Got no VM/CT tag for VM {vm_config.get("name", None)} from API.')
else:
logging.info(f'{info_prefix} Got VM/CT tag {vm_config.get("tags", None)} for VM {vm_config.get("name", None)} from API.')
return vm_config.get('tags', None)


Expand All @@ -769,8 +773,16 @@ def __get_proxlb_groups(vm_tags):
logging.info(f'{info_prefix} Got PLB include group.')
group_include = group

if group.startswith('plb_exclude_'):
if group.startswith('plb_affinity_'):
logging.info(f'{info_prefix} Got PLB include group.')
group_include = group

if group.startswith('plb_exclude_'):
logging.info(f'{info_prefix} Got PLB exclude group.')
group_exclude = group

if group.startswith('plb_antiaffinity_'):
logging.info(f'{info_prefix} Got PLB exclude group.')
group_exclude = group

if group.startswith('plb_ignore_vm'):
Expand Down Expand Up @@ -811,9 +823,9 @@ def balancing_vm_calculations(balancing_method, balancing_mode, balancing_mode_o
logging.info(f'{info_prefix} Best next node for VM & CT placement: {best_next_node[0]}')
sys.exit(0)

# Honour groupings for include and exclude groups for rebalancing VMs.
node_statistics, vm_statistics = __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode)
node_statistics, vm_statistics = __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode)
# # Honour groupings for include and exclude groups for rebalancing VMs.
# node_statistics, vm_statistics = __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode)
# node_statistics, vm_statistics = __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode)

logging.info(f'{info_prefix} Balancing calculations done.')
return node_statistics, vm_statistics
Expand Down Expand Up @@ -947,7 +959,7 @@ def __update_vm_resource_statistics(resource_highest_used_resources_vm, resource
# Assign new rebalance node to vm
vm_statistics[vm_name]['node_rebalance'] = vm_node_rebalance

logging.info(f'Moving {vm_name} from {vm_node_parent} to {vm_node_rebalance}')
logging.info(f'{info_prefix} Moving {vm_name} from {vm_node_parent} to {vm_node_rebalance}')

# Recalculate values for nodes
## Add freed resources to old parent node
Expand Down Expand Up @@ -1005,43 +1017,73 @@ def __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_metho

def __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode):
""" Get VMs tags for exclude groups. """
info_prefix = 'Info: [rebalancing-tags-group-exclude]:'
info_prefix = 'Info: [rebalancing-tags-group-exclude]:'
tags_exclude_vms = {}
processed_vm = []

# Create groups of tags with belongings hosts.
for vm_name, vm_values in vm_statistics.items():
if vm_values.get('group_include', None):
if not tags_exclude_vms.get(vm_values['group_include'], None):
tags_exclude_vms[vm_values['group_include']] = [vm_name]
if vm_values.get('group_exclude', None):
if not tags_exclude_vms.get(vm_values['group_exclude'], None):
tags_exclude_vms[vm_values['group_exclude']] = {}
tags_exclude_vms[vm_values['group_exclude']]['nodes_used'] = []
tags_exclude_vms[vm_values['group_exclude']]['nodes_used'].append(vm_statistics[vm_name]['node_rebalance'])
tags_exclude_vms[vm_values['group_exclude']]['vms'] = [vm_name]
else:
tags_exclude_vms[vm_values['group_include']] = tags_exclude_vms[vm_values['group_include']] + [vm_name]
tags_exclude_vms[vm_values['group_exclude']]['vms'] = tags_exclude_vms[vm_values['group_exclude']]['vms'] + [vm_name]
tags_exclude_vms[vm_values['group_exclude']]['nodes_used'].append(vm_statistics[vm_name]['node_rebalance'])

# Update the VMs to the corresponding node to their group assignments.
for group, vm_names in tags_exclude_vms.items():
# Do not take care of tags that have only a single host included.
if len(vm_names) < 2:
logging.info(f'{info_prefix} Only one host in group assignment.')
return node_statistics, vm_statistics
# Evaluate all VMs assigned for each exclude groups and validate that they will be moved to another random node.
# However, if there are still more VMs than nodes we need to deal with it.
for exclude_group, group_values in tags_exclude_vms.items():

group_values['nodes_used'] = []
for vm in group_values['vms']:

proceed = True
counter = 0

while proceed:

if vm_statistics[vm]['node_rebalance'] in group_values['nodes_used']:
# Find another possible new target node if possible by randomly get any node from
# the cluster and validating if this is already used for this anti-affinity group.
logging.info(f'{info_prefix} Rebalancing of VM {vm} is needed due to anti-affinity group policy.')
random_node, counter, proceed = __get_random_node(counter, node_statistics, vm)

if random_node not in group_values['nodes_used']:
logging.info(f'{info_prefix} New random node {random_node} has not yet been used for the anti-affinity group {exclude_group}.')
group_values['nodes_used'].append(random_node)
logging.info(f'{info_prefix} New random node {random_node} has been added as an already used node to the anti-affinity group {exclude_group}.')
logging.info(f'{info_prefix} VM {vm} switched node from {vm_statistics[vm]["node_rebalance"]} to {random_node} due to the anti-affinity group {exclude_group}.')
vm_statistics[vm]['node_rebalance'] = random_node

vm_node_rebalance = False
logging.info(f'{info_prefix} Create exclude groups of VM hosts.')
for vm_name in vm_names:
if vm_name not in processed_vm:
if not vm_node_rebalance:
random_node = vm_statistics[vm_name]['node_parent']
# Get a random node and make sure that it is not by accident the
# currently assigned one.
while random_node == vm_statistics[vm_name]['node_parent']:
random_node = random.choice(list(node_statistics.keys()))
else:
_mocked_vm_object = (vm_name, vm_statistics[vm_name])
node_statistics, vm_statistics = __update_vm_resource_statistics(_mocked_vm_object, [random_node], vm_statistics, node_statistics, balancing_method, balancing_mode)
processed_vm.append(vm_name)
# Add the used node to the list for the anti-affinity group to ensure no
# other VM with the same anti-affinity group will use it (if possible).
logging.info(f'{info_prefix} Node {vm_statistics[vm]["node_rebalance"]} has been added as an already used node to the anti-affinity group {exclude_group}.')
logging.info(f'{info_prefix} No rebalancing for VM {vm} needed due to any anti-affinity group policies.')
group_values['nodes_used'].append(vm_statistics[vm]['node_rebalance'])
proceed = False

return node_statistics, vm_statistics


def __get_random_node(counter, node_statistics, vm):
""" Get a random node within the Proxmox cluster. """
warning_prefix = 'Warning: [random-node-getter]:'
info_prefix = 'Info: [random-node-getter]:'

counter = counter + 1
random_node = None
if counter < 30:
random_node = random.choice(list(node_statistics.keys()))
logging.info(f'{info_prefix} New random node {random_node} evaluated for vm {vm} in run {counter}.')
return random_node, counter, False
else:
logging.warning(f'{warning_prefix} Reached limit for random node evaluation for vm {vm}. Unable to find a suitable new node.')
return random_node, counter, False


def __wait_job_finalized(api_object, node_name, job_id, counter):
""" Wait for a job to be finalized. """
error_prefix = 'Error: [job-status-getter]:'
Expand Down Expand Up @@ -1101,7 +1143,10 @@ def __run_vm_rebalancing(api_object, _vm_vm_statistics, app_args, parallel_migra
logging.info(f'{info_prefix} Rebalancing will be performed parallely.')

else:
logging.info(f'{info_prefix} No rebalancing needed.')
if app_args.dry_run:
logging.info(f'{info_prefix} Running in dry run mode. Not executing any balancing.')
else:
logging.info(f'{info_prefix} No rebalancing needed.')

return _vm_vm_statistics

Expand Down Expand Up @@ -1389,6 +1434,13 @@ def size_in_bytes(size_str):
return size_value * size_multipliers.get(size_unit, 1)


def balancing_vm_affinity_groups(node_statistics, vm_statistics, balancing_method, balancing_mode):
""" Enforce (anti-)affinity groups for further VM movement across the cluster. """
node_statistics, vm_statistics = __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode)
node_statistics, vm_statistics = __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode)
return node_statistics, vm_statistics


def main():
""" Run ProxLB for balancing VM workloads across a Proxmox cluster. """
vm_output_statistics = {}
Expand Down Expand Up @@ -1430,6 +1482,7 @@ def main():
# Execute VM/CT balancing sub-routines.
if proxlb_config['vm_balancing_enable'] or app_args.best_node:
node_statistics, vm_statistics = balancing_vm_calculations(proxlb_config['vm_balancing_method'], proxlb_config['vm_balancing_mode'], proxlb_config['vm_balancing_mode_option'], node_statistics, vm_statistics, proxlb_config['vm_balanciness'], app_args, rebalance=False, processed_vms=[])
node_statistics, vm_statistics = balancing_vm_affinity_groups(node_statistics, vm_statistics, proxlb_config['vm_balancing_method'], proxlb_config['vm_balancing_mode'],)
vm_output_statistics = run_rebalancing(api_object, vm_statistics, app_args, proxlb_config['vm_parallel_migrations'], 'vm')

# Execute storage balancing sub-routines.
Expand Down

0 comments on commit a8115e1

Please sign in to comment.