Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
pool-target-size through config to better test dynamics
Browse files Browse the repository at this point in the history
  • Loading branch information
willscott committed Sep 5, 2023
1 parent 7375178 commit c8be27d
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 16 deletions.
7 changes: 7 additions & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Config struct {
// PoolRefresh is the interval at which we refresh the pool of upstreams from the orchestrator.
PoolRefresh time.Duration

// PoolTargetSize is a baseline size for the pool - the pool will accept decrements in performance to reach maintain at least this size.
PoolTargetSize int

// MirrorFraction is what fraction of requests will be mirrored to another random node in order to track metrics / determine the current best nodes.
MirrorFraction float64

Expand Down Expand Up @@ -90,6 +93,7 @@ const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
Expand Down Expand Up @@ -155,6 +159,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
if c.config.PoolTargetSize == 0 {
c.config.PoolTargetSize = DefaultPoolTargetSize
}

if c.config.MaxRetrievalAttempts == 0 {
c.config.MaxRetrievalAttempts = defaultMaxRetries
Expand Down
5 changes: 3 additions & 2 deletions node_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func (nh *NodeHeap) PeekRandom() *Node {

func (nh *NodeHeap) TopN(n int) []*Node {
m := make([]*Node, 0, n)
nh.lk.RLock()
defer nh.lk.RUnlock()
nh.lk.Lock()
defer nh.lk.Unlock()
heap.Init(nh)
for i := 0; i < n && i < len(nh.Nodes); i++ {
node := nh.Nodes[i]
m = append(m, node)
Expand Down
12 changes: 7 additions & 5 deletions node_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import (

// NodeRing represents a set of nodes organized for stable hashing.
type NodeRing struct {
Nodes map[string]*Node
ring hashring.HashRing
Nodes map[string]*Node
ring hashring.HashRing
targetSize int

lk sync.RWMutex
}

func NewNodeRing() *NodeRing {
func NewNodeRing(targetSize int) *NodeRing {
return &NodeRing{
Nodes: map[string]*Node{},
ring: *hashring.New([]string{}),
Nodes: map[string]*Node{},
ring: *hashring.New([]string{}),
targetSize: targetSize,
}
}

Expand Down
2 changes: 1 addition & 1 deletion node_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestNodeRing(t *testing.T) {
nr := caboose.NewNodeRing()
nr := caboose.NewNodeRing(30)
nodes := make([]*caboose.Node, 0)
for i := 0; i < 100; i++ {
nodes = append(nodes, &caboose.Node{URL: fmt.Sprintf("node%d", i)})
Expand Down
2 changes: 1 addition & 1 deletion pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newPool(c *Config, logger *logger) *pool {
fetchKeyCoolDownCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute),
fetchKeyFailureCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute),

ActiveNodes: NewNodeRing(),
ActiveNodes: NewNodeRing(c.PoolTargetSize),
AllNodes: NewNodeHeap(),
}

Expand Down
5 changes: 3 additions & 2 deletions pool_dynamics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ are picked randomly in the beginning of each test. At the end of each test, the
always be converging to the "good" nodes.
*/
func TestPoolDynamics(t *testing.T) {

baseStatSize := 100000
baseStatLatency := 100
poolRefreshNo := 10
Expand Down Expand Up @@ -241,7 +240,9 @@ func TestPoolDynamics(t *testing.T) {
}

func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util.CabooseHarness, map[string]string) {
ch := util.BuildCabooseHarness(t, nodesSize, 3)
ch := util.BuildCabooseHarness(t, nodesSize, 3, func(config *caboose.Config) {
config.PoolTargetSize = 3
})

ch.StartOrchestrator()

Expand Down
9 changes: 4 additions & 5 deletions pool_tier_promotion.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package caboose

const (
PoolConsiderationCount = 30
activationThreshold = 0
var (
activationThreshold = 0
)

func updateActiveNodes(active *NodeRing, all *NodeHeap) error {
candidates := all.TopN(PoolConsiderationCount)
candidates := all.TopN(active.targetSize)
added := 0
for _, c := range candidates {
if active.Contains(c) {
continue
}
activeSize := active.Len()
discount := PoolConsiderationCount - activeSize
discount := active.targetSize - activeSize
if discount < 0 {
discount = 0
}
Expand Down

0 comments on commit c8be27d

Please sign in to comment.