Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
add substitution (rough)
Browse files Browse the repository at this point in the history
  • Loading branch information
willscott committed Sep 10, 2023
1 parent d52ef6e commit 61c82da
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 13 deletions.
4 changes: 4 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,7 @@ func (n *Node) Rate() float64 {
last := n.Samples.Peek()
return float64(len) / float64(time.Since(last.Start))
}

func (n *Node) String() string {
return n.URL
}
79 changes: 66 additions & 13 deletions node_ring.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package caboose

import (
"fmt"
"strings"
"sync"

"github.com/willscott/hashring"
Expand Down Expand Up @@ -34,6 +36,40 @@ func (nr *NodeRing) updateRing() error {
return nil
}

// A score of '0' ==> overall experience is the same as the current state
// A positive score ==> overall experience is better than the current state
// A negative score ==> overall experience is worse than the current state
func (nr *NodeRing) getScoreForUpdate(candidate string, priority float64, weight int) float64 {
changes := nr.ring.ConsiderUpdateWeightedNode(candidate, weight)
delta := float64(0)
var neighbor *Node

for n, v := range changes {
neighbor = nr.Nodes[n]
neighborVolume := neighbor.Rate()
if neighborVolume < 1 {
neighborVolume = 1
}

amntChanged := v
// for now, add some bounds
if amntChanged < -1 {
amntChanged = -1
} else if amntChanged > 1 {
amntChanged = 1
}
// a negative amntChanged means that we're replacing the neighbor with the candidate.
amntChanged *= -1

// how much worse is candidate?
diff := priority - neighbor.Priority()
cs := diff * neighborVolume * float64(amntChanged)
delta += cs
// fmt.Printf("+%f (n %s: diff %f=(n %f - candidate %f) * volume %f * v = %f)", cs, neighbor.URL, diff, neighbor.Priority(), priority, neighborVolume, amntChanged)
}
return delta
}

func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold int64) (bool, error) {
nr.lk.Lock()
defer nr.lk.Unlock()
Expand All @@ -46,27 +82,32 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
}

// how much space is being claimed?
overlapEstimate := nr.ring.ConsiderUpdateWeightedNode(candidate.URL, 1)
delta := nr.getScoreForUpdate(candidate.URL, candidate.Priority(), 1)

var neighbor *Node
delta := float64(0)
if delta >= float64(activationThreshold) {
nr.Nodes[candidate.URL] = candidate
return true, nr.updateRing()
}

for n, v := range overlapEstimate {
neighbor = nr.Nodes[n]
neighborVolume := neighbor.Rate()
if neighborVolume < 1 {
neighborVolume = 1
// not a clear benefit to add, but maybe acceptable for substitution:
worst := candidate.Priority()
worstN := ""
for _, n := range nr.Nodes {
if n.Priority() < worst {
worst = n.Priority()
worstN = n.URL
}

// how much worse is candidate?
diff := neighbor.Priority() - candidate.Priority()
delta += diff * neighborVolume * float64(v)
}

if delta > float64(activationThreshold) {
// todo: the '+1' is an arbitrary threshold to prevent thrashing. it should be configurable.
if worstN != "" && candidate.Priority()-worst > float64(activationThreshold)+1 {
nr.Nodes[candidate.URL] = candidate
delete(nr.Nodes, worstN)
return true, nr.updateRing()

}

// fmt.Printf("did not add - delta %f activation %d, node priority %f\n", delta, activationThreshold, candidate.Priority())
return false, nil
}

Expand Down Expand Up @@ -121,3 +162,15 @@ func (nr *NodeRing) Len() int {
defer nr.lk.RUnlock()
return nr.ring.Size()
}

func (nr *NodeRing) String() string {
nr.lk.RLock()
defer nr.lk.RUnlock()

ns := make([]string, 0, len(nr.Nodes))
for _, n := range nr.Nodes {
ns = append(ns, n.String())
}

return fmt.Sprintf("NodeRing[len %d]{%s}", nr.ring.Size(), strings.Join(ns, ","))
}

0 comments on commit 61c82da

Please sign in to comment.