Skip to content

Commit

Permalink
MEP-12: Rack spreading (#426)
Browse files Browse the repository at this point in the history
  • Loading branch information
iljarotar authored Aug 4, 2023
1 parent 6e2a248 commit 22cc77c
Show file tree
Hide file tree
Showing 6 changed files with 857 additions and 14 deletions.
148 changes: 140 additions & 8 deletions cmd/metal-api/internal/datastore/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"crypto/rand"
"errors"
"fmt"
"math"
"math/big"

r "gopkg.in/rethinkdb/rethinkdb-go.v6"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"golang.org/x/exp/slices"
r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

// MachineSearchQuery can be used to search machines.
Expand Down Expand Up @@ -427,7 +428,7 @@ func (rs *RethinkStore) UpdateMachine(oldMachine *metal.Machine, newMachine *met
// FindWaitingMachine returns an available, not allocated, waiting and alive machine of given size within the given partition.
// TODO: the algorithm can be optimized / shortened by using a rethinkdb join command and then using .Sample(1)
// but current implementation should have a slightly better readability.
func (rs *RethinkStore) FindWaitingMachine(partitionid, sizeid string) (*metal.Machine, error) {
func (rs *RethinkStore) FindWaitingMachine(projectid, partitionid, sizeid string, placementTags []string) (*metal.Machine, error) {
q := *rs.machineTable()
q = q.Filter(map[string]interface{}{
"allocation": nil,
Expand Down Expand Up @@ -470,20 +471,151 @@ func (rs *RethinkStore) FindWaitingMachine(partitionid, sizeid string) (*metal.M
return nil, errors.New("no machine available")
}

// pick a random machine from all available ones
var idx int
b, err := rand.Int(rand.Reader, big.NewInt(int64(len(available))))
query := MachineSearchQuery{
AllocationProject: &projectid,
PartitionID: &partitionid,
}

var projectMachines metal.Machines
err = rs.SearchMachines(&query, &projectMachines)
if err != nil {
return nil, err
}
idx = int(b.Uint64())

oldMachine := available[idx]
spreadCandidates := spreadAcrossRacks(available, projectMachines, placementTags)
if len(spreadCandidates) == 0 {
return nil, errors.New("no machine available")
}

oldMachine := spreadCandidates[randomIndex(len(spreadCandidates))]
newMachine := oldMachine
newMachine.PreAllocated = true

err = rs.updateEntity(rs.machineTable(), &newMachine, &oldMachine)
if err != nil {
return nil, err
}

return &newMachine, nil
}

func spreadAcrossRacks(allMachines, projectMachines metal.Machines, tags []string) metal.Machines {
var (
allRacks = groupByRack(allMachines)

projectRacks = groupByRack(projectMachines)
leastOccupiedByProjectRacks = electRacks(allRacks, projectRacks)

taggedMachines = groupByTags(projectMachines).filter(tags...).getMachines()
taggedRacks = groupByRack(taggedMachines)
leastOccupiedByTagsRacks = electRacks(allRacks, taggedRacks)

intersection = intersect(leastOccupiedByTagsRacks, leastOccupiedByProjectRacks)
)

if c := allRacks.filter(intersection...).getMachines(); len(c) > 0 {
return c
}

return allRacks.filter(leastOccupiedByTagsRacks...).getMachines() // tags have precedence over project
}

type groupedMachines map[string]metal.Machines

func (g groupedMachines) getMachines() metal.Machines {
machines := make(metal.Machines, 0)

for id := range g {
machines = append(machines, g[id]...)
}

return machines
}

func (g groupedMachines) filter(keys ...string) groupedMachines {
result := make(groupedMachines)

for i := range keys {
ms, ok := g[keys[i]]
if ok {
result[keys[i]] = ms
}
}

return result
}

func groupByRack(machines metal.Machines) groupedMachines {
racks := make(groupedMachines)

for _, m := range machines {
racks[m.RackID] = append(racks[m.RackID], m)
}

return racks
}

// electRacks returns the least occupied racks from all racks
func electRacks(allRacks, occupiedRacks groupedMachines) []string {
winners := make([]string, 0)
min := math.MaxInt

for id := range allRacks {
if _, ok := occupiedRacks[id]; ok {
continue
}
occupiedRacks[id] = nil
}

for id := range occupiedRacks {
if _, ok := allRacks[id]; !ok {
continue
}

switch {
case len(occupiedRacks[id]) < min:
min = len(occupiedRacks[id])
winners = []string{id}
case len(occupiedRacks[id]) == min:
winners = append(winners, id)
}
}

return winners
}

func groupByTags(machines metal.Machines) groupedMachines {
groups := make(groupedMachines)

for _, m := range machines {
for j := range m.Tags {
ms := groups[m.Tags[j]]
groups[m.Tags[j]] = append(ms, m)
}
}

return groups
}

func randomIndex(max int) int {
if max <= 0 {
return 0
}

b, _ := rand.Int(rand.Reader, big.NewInt(int64(max)))
idx := int(b.Uint64())

return idx
}

func intersect[T comparable](a, b []T) []T {
c := make([]T, 0)

for i := range a {
if slices.Contains(b, a[i]) {
c = append(c, a[i])
}
}

return c
}
Loading

0 comments on commit 22cc77c

Please sign in to comment.