Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto-scaling available machine pool size #356

Open
wants to merge 56 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
709fba2
compare number of waiting machines to partition pool size and react a…
iljarotar Nov 8, 2022
72197ff
poolsize validation, better db queries, pass function to fsm
iljarotar Nov 22, 2022
4e72c2a
pool scaler package
iljarotar Nov 24, 2022
b7fceb3
test adjust number of waiting machines function
iljarotar Nov 24, 2022
6105eb7
check if pool scaling is disabled
iljarotar Nov 29, 2022
489f657
min and max waiting pool size; validation; tests
iljarotar Dec 2, 2022
50b5771
linter
iljarotar Dec 2, 2022
ec55300
spec
iljarotar Dec 2, 2022
086aea2
Update cmd/metal-api/internal/scaler/poolscaler.go
iljarotar Dec 6, 2022
df0da91
Update cmd/metal-api/internal/scaler/poolscaler.go
iljarotar Dec 6, 2022
a8fbaed
Update cmd/metal-api/internal/scaler/poolscaler.go
iljarotar Dec 6, 2022
beefa8d
scaler range type for partition and noop state for fsm
iljarotar Dec 6, 2022
4d400cf
linter
iljarotar Dec 6, 2022
28473cd
fix pool size validation
iljarotar Dec 6, 2022
f3f3e1f
fix calculatePoolSizeExcess function
iljarotar Dec 6, 2022
79d82ce
spec
iljarotar Dec 6, 2022
9faf66c
remove scaler range from partition; constructor for scaler range
iljarotar Dec 8, 2022
d1145fa
fix nil pointer
iljarotar Dec 8, 2022
845a814
spec
iljarotar Dec 8, 2022
ad55fa0
OnTransition -> OnEnter
iljarotar Dec 8, 2022
7951390
scaler range in partitionbase
iljarotar Dec 8, 2022
4efb66e
scaler range in NewPartitionResponse
iljarotar Dec 8, 2022
3b8cd78
partitionid and sizeid added to manager
iljarotar Dec 8, 2022
06f269e
fix state value query filter
iljarotar Dec 8, 2022
7f6a8c2
simplify scaler range
iljarotar Dec 9, 2022
e0dea0c
log error from pool scaler instead of returning it
iljarotar Dec 13, 2022
031b2be
assert expectations in poolscaler test
iljarotar Dec 13, 2022
5fd6fb4
Merge remote-tracking branch 'origin/master' into machine-pool-auto-s…
Gerrit91 Jan 10, 2023
e24bf71
log if no scaling required
iljarotar Jan 13, 2023
c018293
Merge branch 'machine-pool-auto-scaling' of github.com:metal-stack/me…
iljarotar Jan 13, 2023
ada1bbc
named log
iljarotar Jan 13, 2023
e8e6513
no scaling on alive events
iljarotar Jan 19, 2023
f0841a2
set machine state to available if shut down machine is powered on again
iljarotar Jan 19, 2023
26d97a8
Merge branch 'master' of github.com:metal-stack/metal-api into machin…
iljarotar Jan 19, 2023
fea48f0
do not check machine liveliness, if machine is in shutdown state
iljarotar Jan 20, 2023
65e2cf3
add sleeping flag to MachineState and remove SHUTDOWN MState
iljarotar Jan 31, 2023
5fbd639
Merge branch 'master' of github.com:metal-stack/metal-api into machin…
iljarotar Feb 21, 2023
2b607d8
remove rand.Seed
iljarotar Feb 21, 2023
01e00d2
Merge branch 'master' of github.com:metal-stack/metal-api into machin…
iljarotar Feb 24, 2023
7d6afb5
Merge branch 'master' of github.com:metal-stack/metal-api into machin…
iljarotar Mar 2, 2023
aa035a3
hibernation struct
iljarotar Mar 3, 2023
a4d4507
Save liveliness properly and fix linting issue. (#427)
Gerrit91 Mar 7, 2023
b0ba295
Fix integration test (#429)
Gerrit91 Mar 7, 2023
a944273
Merge branch 'master' of github.com:metal-stack/metal-api into machin…
iljarotar Mar 7, 2023
049abbf
Do not use pool scaler in early machine lifecycle. (#430)
Gerrit91 Mar 7, 2023
0c633ef
liveliness typo
iljarotar Mar 7, 2023
1d72977
add waiting and preallocated fields to waiting machine query
iljarotar Mar 9, 2023
cfd3e94
do not update machine liveliness while machine is shutting down
iljarotar Mar 9, 2023
f8e4515
fix nil pointer
iljarotar Mar 9, 2023
9e42317
Add Dockerfile.dev to fasten up dev cycles. (#431)
Gerrit91 Mar 9, 2023
5a55a78
Merge remote-tracking branch 'origin/master' into machine-pool-auto-s…
Gerrit91 Aug 1, 2023
4809b4b
Merge master.
Gerrit91 Aug 1, 2023
6dfbda1
Merge branch 'master' into machine-pool-auto-scaling
iljarotar Aug 4, 2023
4673673
Merge branch 'master' into machine-pool-auto-scaling
iljarotar Jan 25, 2024
fe95940
remove Is functions
iljarotar Jan 25, 2024
634d513
Merge branch 'master' into machine-pool-auto-scaling
Gerrit91 Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ visualize-fsm:
cd cmd/metal-api/internal/tools/visualize_fsm
go run main.go
dot -Tsvg fsm.dot > fsm.svg

.PHONY: mocks
mocks:
docker run --user $$(id -u):$$(id -g) --rm -w /work -v ${PWD}:/work vektra/mockery:v2.14.0 --name MachineManager --dir /work/cmd/metal-api/internal/scaler --output /work/cmd/metal-api/internal/scaler --filename pool_scaler_mock_test.go --testonly --inpackage
36 changes: 31 additions & 5 deletions cmd/metal-api/internal/datastore/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package datastore

import (
"github.com/metal-stack/metal-api/cmd/metal-api/internal/fsm"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/fsm/states"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
s "github.com/metal-stack/metal-api/cmd/metal-api/internal/scaler"
"github.com/metal-stack/metal-lib/bus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -37,22 +40,42 @@ func (rs *RethinkStore) CreateProvisioningEventContainer(ec *metal.ProvisioningE
func (rs *RethinkStore) UpsertProvisioningEventContainer(ec *metal.ProvisioningEventContainer) error {
return rs.upsertEntity(rs.eventTable(), ec)
}
func (rs *RethinkStore) ProvisioningEventForMachine(log *zap.SugaredLogger, event *metal.ProvisioningEvent, machineID string) (*metal.ProvisioningEventContainer, error) {
ec, err := rs.FindProvisioningEventContainer(machineID)
func (rs *RethinkStore) ProvisioningEventForMachine(log *zap.SugaredLogger, publisher bus.Publisher, event *metal.ProvisioningEvent, machine *metal.Machine) (*metal.ProvisioningEventContainer, error) {
ec, err := rs.FindProvisioningEventContainer(machine.ID)
if err != nil && !metal.IsNotFound(err) {
return nil, err
}

if ec == nil {
ec = &metal.ProvisioningEventContainer{
Base: metal.Base{
ID: machineID,
ID: machine.ID,
},
Liveliness: metal.MachineLivelinessAlive,
}
}

newEC, err := fsm.HandleProvisioningEvent(log, ec, event)
p, err := rs.FindPartition(machine.PartitionID)
if err != nil {
return nil, err
}

manager := &manager{
rs: rs,
publisher: publisher,
}
scaler := s.NewPoolScaler(log, manager)

config := states.StateConfig{
Log: log,
Container: ec,
Event: event,
Scaler: scaler,
Partition: p,
Gerrit91 marked this conversation as resolved.
Show resolved Hide resolved
Machine: machine,
}

newEC, err := fsm.HandleProvisioningEvent(&config)
if err != nil {
return nil, err
}
Expand All @@ -62,7 +85,10 @@ func (rs *RethinkStore) ProvisioningEventForMachine(log *zap.SugaredLogger, even
}

newEC.TrimEvents(100)

err = rs.UpsertProvisioningEventContainer(newEC)
if err != nil {
return nil, err
}

return newEC, err
}
1 change: 1 addition & 0 deletions cmd/metal-api/internal/datastore/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type MachineSearchQuery struct {

// state
StateValue *string `json:"state_value" optional:"true"`
Waiting *bool `json:"waiting" optional:"true"`
Gerrit91 marked this conversation as resolved.
Show resolved Hide resolved

// ipmi
IpmiAddress *string `json:"ipmi_address" optional:"true"`
Expand Down
105 changes: 105 additions & 0 deletions cmd/metal-api/internal/datastore/poolsize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package datastore

import (
e "github.com/metal-stack/metal-api/cmd/metal-api/internal/eventbus"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"github.com/metal-stack/metal-lib/bus"
"go.uber.org/zap"
)

type manager struct {
rs *RethinkStore
publisher bus.Publisher
partitionid string
sizeid string
}

func (m *manager) AllMachines() (metal.Machines, error) {
q := MachineSearchQuery{
PartitionID: &m.partitionid,
SizeID: &m.sizeid,
}

allMachines := metal.Machines{}
err := m.rs.SearchMachines(&q, &allMachines)
if err != nil {
return nil, err
}

return allMachines, nil
}

func (m *manager) WaitingMachines() (metal.Machines, error) {
stateValue := string(metal.AvailableState)
waiting := true
q := MachineSearchQuery{
PartitionID: &m.partitionid,
SizeID: &m.sizeid,
StateValue: &stateValue,
Waiting: &waiting,
Gerrit91 marked this conversation as resolved.
Show resolved Hide resolved
}

waitingMachines := metal.Machines{}
err := m.rs.SearchMachines(&q, &waitingMachines)
if err != nil {
return nil, err
}

return waitingMachines, nil
}

func (m *manager) ShutdownMachines() (metal.Machines, error) {
stateValue := string(metal.ShutdownState)
q := MachineSearchQuery{
PartitionID: &m.partitionid,
SizeID: &m.sizeid,
StateValue: &stateValue,
}

shutdownMachines := metal.Machines{}
err := m.rs.SearchMachines(&q, &shutdownMachines)
if err != nil {
return nil, err
}

return shutdownMachines, nil
}

func (m *manager) Shutdown(machine *metal.Machine) error {
state := metal.MachineState{
Value: metal.ShutdownState,
Description: "shut down as exceeding maximum partition poolsize",
iljarotar marked this conversation as resolved.
Show resolved Hide resolved
}

err := m.rs.publishCommandAndUpdate(m.rs.log, machine, m.publisher, metal.MachineOnCmd, state)
Gerrit91 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
return nil
}

func (m *manager) PowerOn(machine *metal.Machine) error {
state := metal.MachineState{Value: metal.AvailableState}

err := m.rs.publishCommandAndUpdate(m.rs.log, machine, m.publisher, metal.MachineOnCmd, state)
if err != nil {
return err
}
return nil
}

func (rs *RethinkStore) publishCommandAndUpdate(logger *zap.SugaredLogger, m *metal.Machine, publisher bus.Publisher, cmd metal.MachineCommand, state metal.MachineState) error {
newMachine := *m
newMachine.State = state
err := rs.UpdateMachine(m, &newMachine)
if err != nil {
return err
}

err = e.PublishMachineCmd(logger, m, publisher, cmd)
if err != nil {
return err
}

return nil
}
19 changes: 19 additions & 0 deletions cmd/metal-api/internal/eventbus/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,22 @@ func (n NSQClient) createTopics(partitions metal.Partitions, topics []metal.NSQT
func (n NSQClient) delay() {
time.Sleep(nsqdRetryDelay)
}

func PublishMachineCmd(logger *zap.SugaredLogger, m *metal.Machine, publisher bus.Publisher, cmd metal.MachineCommand) error {
evt := metal.MachineEvent{
Type: metal.COMMAND,
Cmd: &metal.MachineExecCommand{
Command: cmd,
TargetMachineID: m.ID,
IPMI: &m.IPMI,
},
}

logger.Infow("publish event", "event", evt, "command", *evt.Cmd)
err := publisher.Publish(metal.TopicMachine.GetFQN(m.PartitionID), evt)
if err != nil {
return err
}

return nil
}
3 changes: 3 additions & 0 deletions cmd/metal-api/internal/fsm/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,8 @@ func eventCallbacks(config *states.StateConfig) fsm.Callbacks {
callbacks["enter_"+name] = state.OnTransition
}

waiting := allStates[states.Waiting.String()].(*states.WaitingState)
Gerrit91 marked this conversation as resolved.
Show resolved Hide resolved
callbacks["leave_"+states.Waiting.String()] = waiting.OnLeave

return callbacks
}
47 changes: 20 additions & 27 deletions cmd/metal-api/internal/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,58 @@ import (
"github.com/looplab/fsm"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/fsm/states"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"go.uber.org/zap"
)

// HandleProvisioningEvent can be called to determine whether the given incoming event follows an expected lifecycle of a machine considering the event history of the given provisioning event container.
//
// The function returns a new provisioning event container that can then be safely persisted in the database. If an error is returned, the incoming event is not supposed to be persisted in the database.
//
// Among other things, this function can detect crash loops or other irregularities within a machine lifecycle and enriches the returned provisioning event container with this information.
func HandleProvisioningEvent(log *zap.SugaredLogger, ec *metal.ProvisioningEventContainer, event *metal.ProvisioningEvent) (*metal.ProvisioningEventContainer, error) {
if ec == nil {
return nil, fmt.Errorf("provisioning event container must not be nil")
}

if event == nil {
return nil, fmt.Errorf("provisioning event must not be nil")
func HandleProvisioningEvent(c *states.StateConfig) (*metal.ProvisioningEventContainer, error) {
if err := c.Validate(); err != nil {
return nil, err
}

var (
clone = *ec
container = &clone
f = fsm.NewFSM(
initialStateFromEventContainer(container),
f = fsm.NewFSM(
initialStateFromEventContainer(c.Container),
Events(),
eventCallbacks(&states.StateConfig{Log: log, Event: event, Container: container}),
eventCallbacks(c),
)
)

err := f.Event(event.Event.String())
err := f.Event(c.Event.Event.String())
if err == nil {
return container, nil
return c.Container, nil
}

if errors.As(err, &fsm.InvalidEventError{}) {
if event.Message == "" {
event.Message = fmt.Sprintf("[unexpectedly received in %s]", strings.ToLower(f.Current()))
if c.Event.Message == "" {
c.Event.Message = fmt.Sprintf("[unexpectedly received in %s]", strings.ToLower(f.Current()))
} else {
event.Message = fmt.Sprintf("[unexpectedly received in %s]: %s", strings.ToLower(f.Current()), event.Message)
c.Event.Message = fmt.Sprintf("[unexpectedly received in %s]: %s", strings.ToLower(f.Current()), c.Event.Message)
}

container.LastEventTime = &event.Time
container.Liveliness = metal.MachineLivelinessAlive
container.LastErrorEvent = event
c.Container.LastEventTime = &c.Event.Time
c.Container.Liveliness = metal.MachineLivelinessAlive
c.Container.LastErrorEvent = c.Event

switch e := event.Event; e { //nolint:exhaustive
switch e := c.Event.Event; e { //nolint:exhaustive
case metal.ProvisioningEventPXEBooting, metal.ProvisioningEventPreparing:
container.CrashLoop = true
container.Events = append([]metal.ProvisioningEvent{*event}, container.Events...)
c.Container.CrashLoop = true
c.Container.Events = append([]metal.ProvisioningEvent{*c.Event}, c.Container.Events...)
case metal.ProvisioningEventAlive:
// under no circumstances we want to persists alive in the events container.
// when this happens the FSM gets stuck in invalid transitions
// (e.g. all following transitions are invalid and all subsequent alive events will be stored, cramping history).
default:
container.Events = append([]metal.ProvisioningEvent{*event}, container.Events...)
c.Container.Events = append([]metal.ProvisioningEvent{*c.Event}, c.Container.Events...)
}

return container, nil
return c.Container, nil
}

return nil, fmt.Errorf("internal error while calculating provisioning event container for machine %s: %w", container.ID, err)
return nil, fmt.Errorf("internal error while calculating provisioning event container for machine %s: %w", c.Container.ID, err)
}

func initialStateFromEventContainer(container *metal.ProvisioningEventContainer) string {
Expand Down
29 changes: 21 additions & 8 deletions cmd/metal-api/internal/fsm/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/fsm/states"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"go.uber.org/zap/zaptest"
)
Expand Down Expand Up @@ -629,7 +630,13 @@ func TestHandleProvisioningEvent(t *testing.T) {
for i := range tests {
tt := tests[i]
t.Run(tt.name, func(t *testing.T) {
got, err := HandleProvisioningEvent(zaptest.NewLogger(t).Sugar(), tt.container, tt.event)
params := states.StateConfig{
Log: zaptest.NewLogger(t).Sugar(),
Container: tt.container,
Event: tt.event,
}

got, err := HandleProvisioningEvent(&params)
if diff := cmp.Diff(tt.wantErr, err); diff != "" {
t.Errorf("HandleProvisioningEvent() diff = %s", diff)
}
Expand All @@ -649,15 +656,21 @@ func TestReactionToAllIncomingEvents(t *testing.T) {
// this test ensures that for every incoming event we have a proper transition
for e1 := range metal.AllProvisioningEventTypes {
for e2 := range metal.AllProvisioningEventTypes {
_, err := HandleProvisioningEvent(zaptest.NewLogger(t).Sugar(), &metal.ProvisioningEventContainer{
Events: metal.ProvisioningEvents{
{
Event: e2,
params := states.StateConfig{
Log: zaptest.NewLogger(t).Sugar(),
Container: &metal.ProvisioningEventContainer{
Events: metal.ProvisioningEvents{
{
Event: e2,
},
},
},
}, &metal.ProvisioningEvent{
Event: e1,
})
Event: &metal.ProvisioningEvent{
Event: e1,
},
}

_, err := HandleProvisioningEvent(&params)
if err != nil {
t.Errorf("transitioning from state %s with event %s: %s", e2, e1, err)
}
Expand Down
18 changes: 18 additions & 0 deletions cmd/metal-api/internal/fsm/states/states.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package states

import (
"fmt"

"github.com/looplab/fsm"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/scaler"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -35,6 +38,21 @@ type StateConfig struct {
Log *zap.SugaredLogger
Container *metal.ProvisioningEventContainer
Event *metal.ProvisioningEvent
Scaler *scaler.PoolScaler
Machine *metal.Machine
Partition *metal.Partition
}

func (c *StateConfig) Validate() error {
if c.Container == nil {
return fmt.Errorf("provisioning event container must not be nil")
}

if c.Event == nil {
return fmt.Errorf("provisioning event must not be nil")
}

return nil
}

func AllStates(c *StateConfig) map[string]FSMState {
Expand Down
Loading