Skip to content

Commit

Permalink
refactor(hetzner): refactored placement group code
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmetzner committed Nov 22, 2024
1 parent 4c37ff3 commit 64495d9
Showing 1 changed file with 41 additions and 70 deletions.
111 changes: 41 additions & 70 deletions cluster-autoscaler/cloudprovider/hetzner/hetzner_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package hetzner

import (
"context"
"errors"
"fmt"
"regexp"
"strconv"
Expand All @@ -28,8 +29,9 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/hetzner/hcloud-go/hcloud"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
autoscalerErrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/klog/v2"
)
Expand All @@ -45,6 +47,7 @@ const (
serverCreateTimeoutDefault = 5 * time.Minute
serverRegisterTimeout = 10 * time.Minute
defaultPodAmountsLimit = 110
maxPlacementGroupSize = 10
)

// HetznerCloudProvider implements CloudProvider interface.
Expand Down Expand Up @@ -107,7 +110,7 @@ func (d *HetznerCloudProvider) HasInstance(node *apiv1.Node) (bool, error) {

// Pricing returns pricing model for this cloud provider or error if not
// available. Implementation optional.
func (d *HetznerCloudProvider) Pricing() (cloudprovider.PricingModel, errors.AutoscalerError) {
func (d *HetznerCloudProvider) Pricing() (cloudprovider.PricingModel, autoscalerErrors.AutoscalerError) {
return nil, cloudprovider.ErrNotImplemented
}

Expand Down Expand Up @@ -179,31 +182,6 @@ func (d *HetznerCloudProvider) Refresh() error {
return nil
}

// Check if any defined placement groups could potentially have more than the maximum allowed number of nodes
func getLargePlacementGroups(nodeGroups map[string]*hetznerNodeGroup, threshold int) []int64 {
placementGroupTotals := make(map[int64]int)

// Calculate totals for each placement group
for _, nodeGroup := range nodeGroups {
if nodeGroup.placementGroup == nil || nodeGroup.placementGroup.ID == 0 {
continue
}

placementGroupID := nodeGroup.placementGroup.ID
placementGroupTotals[placementGroupID] += nodeGroup.maxSize
}

// Collect placement groups with total maxSize > threshold
var largePlacementGroupIDs []int64
for id, totalMaxSize := range placementGroupTotals {
if totalMaxSize > threshold {
largePlacementGroupIDs = append(largePlacementGroupIDs, id)
}
}

return largePlacementGroupIDs
}

// BuildHetzner builds the Hetzner cloud provider.
func BuildHetzner(_ config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
manager, err := newManager()
Expand All @@ -222,6 +200,7 @@ func BuildHetzner(_ config.AutoscalingOptions, do cloudprovider.NodeGroupDiscove

validNodePoolName := regexp.MustCompile(`^[a-z0-9A-Z]+[a-z0-9A-Z\-\.\_]*[a-z0-9A-Z]+$|^[a-z0-9A-Z]{1}$`)
clusterUpdateLock := sync.Mutex{}
placementGroupTotals := make(map[string]int)
for _, nodegroupSpec := range do.NodeGroupSpecs {
spec, err := createNodePoolSpec(nodegroupSpec)
if err != nil {
Expand All @@ -234,11 +213,19 @@ func BuildHetzner(_ config.AutoscalingOptions, do cloudprovider.NodeGroupDiscove
klog.Fatalf("Failed to get servers for for node pool %s error: %v", nodegroupSpec, err)
}

var placementGroup *hcloud.PlacementGroup
if manager.clusterConfig.IsUsingNewFormat {
_, ok := manager.clusterConfig.NodeConfigs[spec.name]
if !ok {
klog.Fatalf("No node config present for node group id `%s` error: %v", spec.name, err)
}

placementGroupRef := manager.clusterConfig.NodeConfigs[spec.name].PlacementGroup

if placementGroupRef != "" {
placementGroup = getPlacementGroup(manager, placementGroupRef)
placementGroupTotals[placementGroup.Name] += spec.maxSize
}
}

manager.nodeGroups[spec.name] = &hetznerNodeGroup{
Expand All @@ -250,61 +237,45 @@ func BuildHetzner(_ config.AutoscalingOptions, do cloudprovider.NodeGroupDiscove
region: strings.ToLower(spec.region),
targetSize: len(servers),
clusterUpdateMutex: &clusterUpdateLock,
placementGroup: placementGroup,
}
}

// If a placement group was specified, check with the API to see if it exists
if manager.clusterConfig.IsUsingNewFormat {

placementGroupRef := manager.clusterConfig.NodeConfigs[spec.name].PlacementGroup

if placementGroupRef == "" {
continue
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

placementGroup, _, err := manager.client.PlacementGroup.Get(ctx, placementGroupRef)

// Check if an error occurred
if err != nil {
if err == context.DeadlineExceeded {
klog.Fatalf("Timed out checking if placement group `%s` exists.", placementGroupRef)
} else {
klog.Fatalf("Failed to verify if placement group `%s` exists error: %v", placementGroupRef, err)
}
}

// If the placement group exists, add it to the node group config
if placementGroup != nil {
manager.nodeGroups[spec.name].placementGroup = placementGroup
} else {
klog.Fatalf("The requested placement group `%s` does not appear to exist.", placementGroupRef)
}
// Check if placement groups spanned over multiple node groups exceeds max placement group size
for pgName, totalMaxSize := range placementGroupTotals {
if totalMaxSize > maxPlacementGroupSize {
klog.Fatalf(
"Placement group %s exceeds max placement group size of %d, size %d",
pgName,
maxPlacementGroupSize,
totalMaxSize,
)
}
}

// Get placement groups with total maxSize over the maximum allowed
maxPlacementGroupSize := 10
return provider
}

largePlacementGroups := getLargePlacementGroups(manager.nodeGroups, maxPlacementGroupSize)
func getPlacementGroup(manager *hetznerManager, placementGroupRef string) *hcloud.PlacementGroup {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Fail if we have placement groups over the max size
if len(largePlacementGroups) > 0 {
placementGroup, _, err := manager.client.PlacementGroup.Get(ctx, placementGroupRef)

// Gather placement group names
var placementGroupIDs string
for i, placementGroupID := range largePlacementGroups {
if i > 0 {
placementGroupIDs += ", "
}
placementGroupIDs += strconv.FormatInt(placementGroupID, 10)
// Check if an error occurred
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
klog.Fatalf("Timed out checking if placement group `%s` exists.", placementGroupRef)
} else {
klog.Fatalf("Failed to verify if placement group `%s` exists. Error: %v", placementGroupRef, err)
}
}

klog.Fatalf("The following placement groups have a potential size over the allowed maximum of %d: %s.", maxPlacementGroupSize, placementGroupIDs)
if placementGroup == nil {
klog.Fatalf("The requested placement group `%s` does not appear to exist.", placementGroupRef)
}

return provider
return placementGroup
}

func createNodePoolSpec(groupSpec string) (*hetznerNodeGroupSpec, error) {
Expand Down

0 comments on commit 64495d9

Please sign in to comment.