Skip to content

Commit

Permalink
Wait for a backend + timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
setaou committed Sep 1, 2023
1 parent 5e3f162 commit d84c843
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 24 deletions.
86 changes: 65 additions & 21 deletions balancer/wrr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"math/rand"
"slices"
"sync"
"time"

"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/zclconf/go-cty/cty"
"golang.org/x/sync/semaphore"

"mlb/backend"
"mlb/misc"
Expand All @@ -22,20 +24,25 @@ func init() {
}

type WRRBalancer struct {
id string
backends backend.BackendsMap
weightedlist []string
mu sync.RWMutex
log zerolog.Logger
upd_chan chan backend.BackendUpdate
source string
evalCtx *hcl.EvalContext
id string
backends backend.BackendsMap
weightedlist []string
mu sync.RWMutex
log zerolog.Logger
upd_chan chan backend.BackendUpdate
source string
evalCtx *hcl.EvalContext
ctx context.Context
ctx_cancel context.CancelFunc
wait_backends *semaphore.Weighted
timeout time.Duration
}

type WRRBalancerConfig struct {
ID string
Source string `hcl:"source"`
Weight hcl.Expression `hcl:"weight"`
ID string
Source string `hcl:"source"`
Weight hcl.Expression `hcl:"weight"`
Timeout string `hcl:"timeout,optional"`
}

type WRRBalancerFactory struct{}
Expand All @@ -49,38 +56,53 @@ func (w WRRBalancerFactory) parseConfig(tc *Config) *WRRBalancerConfig {
config := &WRRBalancerConfig{}
gohcl.DecodeBody(tc.Config, tc.ctx, config)
config.ID = fmt.Sprintf("balancer.%s.%s", tc.Type, tc.Name)
if config.Timeout == "" {
config.Timeout = "0s"
}
return config
}

func (w WRRBalancerFactory) New(tc *Config, wg *sync.WaitGroup, ctx context.Context) backend.BackendProvider {
config := w.parseConfig(tc)

b := &WRRBalancer{
id: config.ID,
backends: make(backend.BackendsMap),
weightedlist: make([]string, 0),
log: log.With().Str("id", config.ID).Logger(),
upd_chan: make(chan backend.BackendUpdate),
source: config.Source,
evalCtx: tc.ctx,
id: config.ID,
backends: make(backend.BackendsMap),
weightedlist: make([]string, 0),
log: log.With().Str("id", config.ID).Logger(),
upd_chan: make(chan backend.BackendUpdate),
source: config.Source,
evalCtx: tc.ctx,
wait_backends: semaphore.NewWeighted(1),
}

ctx, cancel := context.WithCancel(ctx)
var err error

b.timeout, err = time.ParseDuration(config.Timeout)
misc.PanicIfErr(err)

b.ctx, b.ctx_cancel = context.WithCancel(ctx)

wg.Add(1)
b.log.Info().Msg("WRR Balancer starting")

go func() {
defer wg.Done()
defer b.log.Info().Msg("WRR Balancer stopped")
defer cancel()
defer b.ctx_cancel()
defer close(b.upd_chan)

b.log.Debug().Msg("No backends in the list, acquiring the lock")
b.wait_backends.Acquire(b.ctx, 1)

mainloop:
for {
select {
case upd := <-b.upd_chan: // Backend changed
b.mu.Lock()

list_previous_size := len(b.weightedlist)

switch upd.Kind {
case backend.UpdBackendAdded:
var weight int
Expand Down Expand Up @@ -114,9 +136,20 @@ func (w WRRBalancerFactory) New(tc *Config, wg *sync.WaitGroup, ctx context.Cont
b.weightedlist = slices.DeleteFunc(b.weightedlist, func(a string) bool { return a == upd.Address })
delete(b.backends, upd.Address)
}

list_new_size := len(b.weightedlist)

if list_previous_size == 0 && list_new_size > 0 {
b.log.Debug().Msg("At least one backend has been added to the list, releasing the lock")
b.wait_backends.Release(1)
} else if list_previous_size > 0 && list_new_size == 0 {
b.log.Debug().Msg("There are no more backends in the list, acquiring the lock")
b.wait_backends.Acquire(b.ctx, 1)
}

b.mu.Unlock()

case <-ctx.Done(): // Context cancelled
case <-b.ctx.Done(): // Context cancelled
break mainloop
}
}
Expand All @@ -129,6 +162,17 @@ func (b *WRRBalancer) GetBackend() *backend.Backend {
b.mu.RLock()
defer b.mu.RUnlock()

// Wait for the backend list to be populated or a timeout to occur
if len(b.weightedlist) == 0 && b.timeout > 0 {
b.mu.RUnlock()
ctx, ctx_cancel := context.WithDeadline(b.ctx, time.Now().Add(b.timeout))
defer ctx_cancel()
if b.wait_backends.Acquire(ctx, 1) == nil {
b.wait_backends.Release(1)
}
b.mu.RLock()
}

if len(b.weightedlist) > 0 {
address := b.weightedlist[rand.Intn(len(b.weightedlist))]
return b.backends[address]
Expand Down
1 change: 1 addition & 0 deletions config.example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ backends_processor "simple_filter" "mysql_main_ro" {
balancer "wrr" "mysql_main_ro" {
source = backends_processor.simple_filter.mysql_main_ro
weight = backend.meta.consul.weight
timeout = "1s"
}

proxy "tcp" "mysql_main_ro" {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/rs/zerolog v1.30.0
golang.org/x/sys v0.11.0
golang.org/x/sync v0.3.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ golang.org/x/exp v0.0.0-20230725093048-515e97ebf090/go.mod h1:FXUEEKJgO7OQYeo8N0
golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI=
golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
3 changes: 0 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ func main() {

metrics.NewHTTPServer(conf.Metrics.Address, &wg, ctx)

// TODO: Replace that with a real retroaction from backend providers
time.Sleep(1 * time.Second) // Wait one second to ensure backends are available

// Start proxies
for _, c := range conf.ProxyList {
proxy.New(c, backendProviders, &wg, ctx)
Expand Down

0 comments on commit d84c843

Please sign in to comment.