diff --git a/node.go b/node.go index 4fc27ce..7424b41 100644 --- a/node.go +++ b/node.go @@ -131,3 +131,7 @@ func (n *Node) Rate() float64 { last := n.Samples.Peek() return float64(len) / float64(time.Since(last.Start)) } + +func (n *Node) String() string { + return n.URL +} diff --git a/node_ring.go b/node_ring.go index 0611687..dfe6b31 100644 --- a/node_ring.go +++ b/node_ring.go @@ -1,6 +1,8 @@ package caboose import ( + "fmt" + "strings" "sync" "github.com/willscott/hashring" @@ -34,6 +36,40 @@ func (nr *NodeRing) updateRing() error { return nil } +// A score of '0' ==> overall experience is the same as the current state +// A positive score ==> overall experience is better than the current state +// A negative score ==> overall experience is worse than the current state +func (nr *NodeRing) getScoreForUpdate(candidate string, priority float64, weight int) float64 { + changes := nr.ring.ConsiderUpdateWeightedNode(candidate, weight) + delta := float64(0) + var neighbor *Node + + for n, v := range changes { + neighbor = nr.Nodes[n] + neighborVolume := neighbor.Rate() + if neighborVolume < 1 { + neighborVolume = 1 + } + + amntChanged := v + // for now, add some bounds + if amntChanged < -1 { + amntChanged = -1 + } else if amntChanged > 1 { + amntChanged = 1 + } + // a negative amntChanged means that we're replacing the neighbor with the candidate. + amntChanged *= -1 + + // how much worse is candidate? + diff := priority - neighbor.Priority() + cs := diff * neighborVolume * float64(amntChanged) + delta += cs + // fmt.Printf("+%f (n %s: diff %f=(n %f - candidate %f) * volume %f * v = %f)", cs, neighbor.URL, diff, neighbor.Priority(), priority, neighborVolume, amntChanged) + } + return delta +} + func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold int64) (bool, error) { nr.lk.Lock() defer nr.lk.Unlock() @@ -46,27 +82,32 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in } // how much space is being claimed? - overlapEstimate := nr.ring.ConsiderUpdateWeightedNode(candidate.URL, 1) + delta := nr.getScoreForUpdate(candidate.URL, candidate.Priority(), 1) - var neighbor *Node - delta := float64(0) + if delta >= float64(activationThreshold) { + nr.Nodes[candidate.URL] = candidate + return true, nr.updateRing() + } - for n, v := range overlapEstimate { - neighbor = nr.Nodes[n] - neighborVolume := neighbor.Rate() - if neighborVolume < 1 { - neighborVolume = 1 + // not a clear benefit to add, but maybe acceptable for substitution: + worst := candidate.Priority() + worstN := "" + for _, n := range nr.Nodes { + if n.Priority() < worst { + worst = n.Priority() + worstN = n.URL } - - // how much worse is candidate? - diff := neighbor.Priority() - candidate.Priority() - delta += diff * neighborVolume * float64(v) } - if delta > float64(activationThreshold) { + // todo: the '+1' is an arbitrary threshold to prevent thrashing. it should be configurable. + if worstN != "" && candidate.Priority()-worst > float64(activationThreshold)+1 { nr.Nodes[candidate.URL] = candidate + delete(nr.Nodes, worstN) return true, nr.updateRing() + } + + // fmt.Printf("did not add - delta %f activation %d, node priority %f\n", delta, activationThreshold, candidate.Priority()) return false, nil } @@ -121,3 +162,15 @@ func (nr *NodeRing) Len() int { defer nr.lk.RUnlock() return nr.ring.Size() } + +func (nr *NodeRing) String() string { + nr.lk.RLock() + defer nr.lk.RUnlock() + + ns := make([]string, 0, len(nr.Nodes)) + for _, n := range nr.Nodes { + ns = append(ns, n.String()) + } + + return fmt.Sprintf("NodeRing[len %d]{%s}", nr.ring.Size(), strings.Join(ns, ",")) +}