Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

feat: modify pool tests #153

Merged
merged 44 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
17bb795
feat: modify pool tests
AmeanAsad Aug 31, 2023
00a4344
fix: surface caboose pool methods for testing
AmeanAsad Sep 1, 2023
0cd98cf
fix: top n node selection from heap
AmeanAsad Sep 1, 2023
b6b03f9
feat: add more comprehensive tests
AmeanAsad Sep 4, 2023
5537699
enhancement: add refresh no to tests
AmeanAsad Sep 4, 2023
778cd45
go fmt
AmeanAsad Sep 4, 2023
35646a8
remove unused metrics
aarshkshah1992 Sep 4, 2023
fa59f39
put back trace
aarshkshah1992 Sep 4, 2023
a23963d
Merge pull request #154 from filecoin-saturn/feat/remove-metrics
aarshkshah1992 Sep 4, 2023
d2c669e
response size does not include header
aarshkshah1992 Sep 4, 2023
ce46ce5
reset retry counter only if progress is made
aarshkshah1992 Sep 4, 2023
ca5522d
update go-car
aarshkshah1992 Sep 4, 2023
27b62be
dont drain response body
aarshkshah1992 Sep 4, 2023
296eaec
send verification errors to Saturn
aarshkshah1992 Sep 4, 2023
eb1e8b8
pool tier promotion
aarshkshah1992 Sep 4, 2023
2713f51
otel and send trace id to Saturn
aarshkshah1992 Sep 4, 2023
7375178
stabilize dynamics tests
willscott Sep 4, 2023
93135a7
mirroring parallel
aarshkshah1992 Sep 5, 2023
bda8d0d
Merge remote-tracking branch 'origin/aa/test-simulator' into feat/por…
aarshkshah1992 Sep 5, 2023
c8be27d
pool-target-size through config to better test dynamics
willscott Sep 5, 2023
d52ef6e
down to flakiness
willscott Sep 6, 2023
61c82da
add substitution (rough)
willscott Sep 10, 2023
550cf5b
Merge remote-tracking branch 'origin/aa/test-simulator' into feat/por…
aarshkshah1992 Sep 18, 2023
c0ea85c
use new orchestrator API
aarshkshah1992 Sep 18, 2023
608a668
Merge pull request #161 from filecoin-saturn/feat/integrate-new-endpoint
aarshkshah1992 Sep 18, 2023
ea1d62b
fix: top N selection
AmeanAsad Sep 18, 2023
05c2b37
Merge branch 'aa/test-simulator' into feat/port-Caboose-main
AmeanAsad Sep 18, 2023
c1ab0e9
enhancement: increase test size
AmeanAsad Sep 19, 2023
1975f49
feat: Add tests for affinity
AmeanAsad Sep 19, 2023
78f3490
test cache affinity
aarshkshah1992 Sep 19, 2023
5e02c7f
test cache affinity
aarshkshah1992 Sep 19, 2023
d8ae01e
remove assert
aarshkshah1992 Sep 19, 2023
b647fab
fix test
aarshkshah1992 Sep 19, 2023
0cf6c94
address review
aarshkshah1992 Sep 19, 2023
552ea1b
Merge pull request #163 from filecoin-saturn/feat/cache-aff-test
aarshkshah1992 Sep 19, 2023
9eb9c18
feat: port compliance cids
AmeanAsad Sep 19, 2023
af17595
fix: remove unused code
AmeanAsad Sep 19, 2023
310c079
modify harness
AmeanAsad Sep 19, 2023
8804f45
feat: add core attr to trace span
AmeanAsad Sep 19, 2023
3f63a01
Merge pull request #164 from filecoin-saturn/feat/port-compliance-cids
aarshkshah1992 Sep 20, 2023
da9ad17
Merge branch 'aa/test-simulator' into feat/port-Caboose-main
aarshkshah1992 Sep 20, 2023
46b5374
fix CI
aarshkshah1992 Sep 20, 2023
ad399fb
Merge pull request #155 from filecoin-saturn/feat/port-Caboose-main
aarshkshah1992 Sep 20, 2023
1015a7f
improve error classification (#165)
AmeanAsad Oct 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch.Caboose = bs
ch.CabooseActiveNodes = conf.Harness.ActiveNodes.(*caboose.NodeRing)
ch.CabooseAllNodes = conf.Harness.AllNodes.(*caboose.NodeHeap)
ch.CaboosePool = conf.Harness.PoolController
return ch
}

Expand All @@ -87,11 +88,18 @@ type CabooseHarness struct {

CabooseActiveNodes *caboose.NodeRing
CabooseAllNodes *caboose.NodeHeap
CaboosePool state.PoolController

gol sync.Mutex
goodOrch bool
}

type NodeStats struct {
Start time.Time
Latency float64
Size float64
}

func (ch *CabooseHarness) RunFetchesForRandCids(n int) {
for i := 0; i < n; i++ {
randCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte{uint8(i)})
Expand Down Expand Up @@ -122,6 +130,24 @@ func (ch *CabooseHarness) FetchAndAssertSuccess(t *testing.T, ctx context.Contex
require.NotEmpty(t, blk)
}

func (ch *CabooseHarness) RecordSuccesses(t *testing.T, nodes []*caboose.Node, s NodeStats, n int) {
for _, node := range(nodes) {
s.Start = time.Now().Add(-time.Second*5)
for i := 0; i < n; i++ {
node.RecordSuccess(s.Start, s.Latency, s.Size)
}
}
}

func (ch *CabooseHarness) RecordFailures(t *testing.T, nodes []*caboose.Node, n int) {
for _, node := range(nodes) {
for i := 0; i < n; i++ {
node.RecordFailure()
}
}
}


func (ch *CabooseHarness) FailNodesWithCode(t *testing.T, selectorF func(ep *Endpoint) bool, code int) {
for _, n := range ch.Endpoints {
if selectorF(n) {
Expand Down
11 changes: 8 additions & 3 deletions node_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ func (nh *NodeHeap) TopN(n int) []*Node {
m := make([]*Node, 0, n)
nh.lk.RLock()
defer nh.lk.RUnlock()
for i := 0; i < n && i < len(nh.Nodes); i++ {
node := nh.Nodes[i]
m = append(m, node)
for i := 0; i < n; i++ {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott noticed a slight issue with the use of heaps here.

The heap will keep itself sorted upon insertion and removal of new nodes. However, the existing nodes in the heap have dynamically changing values and the heap does not continuously sort itself every time one of those values changes.

The strategy I've implemented here is to select the top N nodes from the heap, to use .Pop() n times then to retrieve the top nodes, then add them back using .Push(). This is a similar flow to the .Fix() method that exists on the heap which is meant to reposition an item in the heap if it changes its value. Not the most efficient, but we are dealing with fairly small heap sizes here.

Also wondering if we still think that a priority queue is the best data structure to store the AllNodes tier? My understanding of the code / implementation is that the ActiveNodes is always a subset of AllNodes and they are not mutually exclusive sets like the previous "Main" and "Uknown" tier design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott One issue I noticed with my changes is that since the nodes are temporarily removed from the heap, if there are any stats being written to them concurrently, that will fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by 'fail'? a temporary dropped metric isn't the end of the world, potentially.

If not a priority queue for all nodes, how would you imagine keeping track of the whole set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by 'fail'? a temporary dropped metric isn't the end of the world, potentially.

What i mean by "fail" is that any concurrent operation that accesses the heap while the topN selection is occurring, it might get an incorrect list. I do agree with you though, it doesn't seem that damaging specially that the "AllNodes" set is just used for testing nodes.

If not a priority queue for all nodes, how would you imagine keeping track of the whole set?

Wondering if a regular array structure would work better here. It's simpler and gives us the functionality we need (top n nodes, random nodes, node removal). We're not really using the properties of a priority queue because the values of each node in the queue are changing constantly and we have to basically re-sort the nodes every time we need to make a selection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We can add add/remove methods that are simple enough api.
  • i think keeping the nodes in a roughly sorted order for consideration isn't a bad idea, as we'll want to access the better ones.
  • if we run into performance issues with using the push/pop semantics on updates, we can get lazy about it and then use the heap fix when we need to get the top n.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one note on golang semantics here: you have the nh.lk.RLock() above - that indicates you're taking a "read only lock" over the data structure - but the pop and push you're proposing in your change will do a read-write operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip here Will, always good to learn :)

node := heap.Pop(nh)
if n, ok := node.(*Node); ok {
m = append(m, n)
}
}
for _, n := range(m) {
heap.Push(nh, n)
}
return m
}
Expand Down
22 changes: 11 additions & 11 deletions node_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ import (

// NodeRing represents a set of nodes organized for stable hashing.
type NodeRing struct {
nodes map[string]*Node
Nodes map[string]*Node
ring hashring.HashRing

lk sync.RWMutex
}

func NewNodeRing() *NodeRing {
return &NodeRing{
nodes: map[string]*Node{},
Nodes: map[string]*Node{},
ring: *hashring.New([]string{}),
}
}

func (nr *NodeRing) updateRing() error {
// this method expects that the lk is held when called.
rs := make(map[string]int)
for _, n := range nr.nodes {
for _, n := range nr.Nodes {
// TODO: weight multiples
rs[n.URL] = 1
}
Expand All @@ -39,7 +39,7 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
_, ok := nr.ring.GetNode(candidate.URL)
if !ok {
// ring is empty. in this case we always want to add.
nr.nodes[candidate.URL] = candidate
nr.Nodes[candidate.URL] = candidate
return true, nr.updateRing()
}

Expand All @@ -50,7 +50,7 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
delta := float64(0)

for n, v := range overlapEstimate {
neighbor = nr.nodes[n]
neighbor = nr.Nodes[n]
neighborVolume := neighbor.Rate()

// how much worse is candidate?
Expand All @@ -59,7 +59,7 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
}

if delta > float64(activationThreshold) {
nr.nodes[candidate.URL] = candidate
nr.Nodes[candidate.URL] = candidate
return true, nr.updateRing()
}
return false, nil
Expand All @@ -68,16 +68,16 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
func (nr *NodeRing) Add(n *Node) error {
nr.lk.Lock()
defer nr.lk.Unlock()
nr.nodes[n.URL] = n
nr.Nodes[n.URL] = n
return nr.updateRing()
}

func (nr *NodeRing) Remove(n *Node) error {
nr.lk.Lock()
defer nr.lk.Unlock()

if _, ok := nr.nodes[n.URL]; ok {
delete(nr.nodes, n.URL)
if _, ok := nr.Nodes[n.URL]; ok {
delete(nr.Nodes, n.URL)
return nr.updateRing()
}
return ErrNoBackend
Expand All @@ -87,7 +87,7 @@ func (nr *NodeRing) Contains(n *Node) bool {
nr.lk.RLock()
defer nr.lk.RUnlock()

_, ok := nr.nodes[n.URL]
_, ok := nr.Nodes[n.URL]
return ok
}

Expand All @@ -104,7 +104,7 @@ func (nr *NodeRing) GetNodes(key string, number int) ([]*Node, error) {
}
nodes := make([]*Node, 0, len(keys))
for _, k := range keys {
if n, ok := nr.nodes[k]; ok {
if n, ok := nr.Nodes[k]; ok {
nodes = append(nodes, n)
}
}
Expand Down
79 changes: 78 additions & 1 deletion pool_dynamics_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,90 @@
package caboose_test

import (
"context"
"fmt"
"math/rand"
"net/url"
"testing"
"time"

"github.com/filecoin-saturn/caboose"
"github.com/filecoin-saturn/caboose/internal/util"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
)


const (
nodesSize = 10
nodesPoolSize = caboose.PoolConsiderationCount
)
Copy link
Contributor

@aarshkshah1992 aarshkshah1992 Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmeanAsad @willscott

Why do we need to do this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to change the pool size desired for testing. I reduced the pool size for testing cache affinity to make it easier to debug and get the tests passing. Once we get that passing we can set it to a high / realistic number again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, yeah that makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has already been structured so that the lower pool target is only used in the test context. A larger pool size target is used when running normally



func TestPoolDynamics(t *testing.T) {
ch := util.BuildCabooseHarness(t, 3, 3)

ch := util.BuildCabooseHarness(t, nodesSize , 3)
ch.StartOrchestrator()
baseStatSize := 100
baseStatLatency := 100
ctx := context.Background()

testCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum(testBlock)

ch.FetchAndAssertSuccess(t, ctx, testCid)

rand.New(rand.NewSource(0))
eps := ch.Endpoints
controlGroup := make(map[string]string)

rand.Shuffle(len(eps), func(i, j int) {
eps[i], eps[j] = eps[j], eps[i]
})

for _,ep := range(eps[:nodesPoolSize]) {
url, _ := url.Parse(ep.Server.URL)
controlGroup[url.Host] = ep.Server.URL

}

for i := 0; i < 1; i++ {
nodes := ch.CabooseAllNodes
goodNodes := make([]*caboose.Node, 0)
badNodes := make([]*caboose.Node, 0)
goodStats := util.NodeStats{
Start: time.Now().Add(-time.Second*6),
Latency: float64(baseStatLatency) / float64(10),
Size: float64(baseStatSize) * float64(10),
}
badStats := util.NodeStats{
Start: time.Now().Add(-time.Second*6),
Latency: float64(baseStatLatency) * float64(10),
Size: float64(baseStatSize) / float64(10),
}
for _,n := range(nodes.Nodes) {
_, ok := controlGroup[n.URL]
if ok {
fmt.Println("Good", n.URL)

goodNodes = append(goodNodes, n)
} else {
fmt.Println("Bad", n.URL)

badNodes = append(badNodes, n)
}
}

ch.RecordSuccesses(t, goodNodes, goodStats, 10)
ch.RecordSuccesses(t, badNodes, badStats, 10)

}


ch.CaboosePool.DoRefresh()
fmt.Println("Pool", ch.CabooseActiveNodes.Nodes)

for _,n := range(ch.CabooseAllNodes.Nodes) {
fmt.Println("Node", n.URL, "size", n.PredictedThroughput)
}

}
11 changes: 8 additions & 3 deletions pool_tier_promotion.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package caboose

import "fmt"

const (
poolConsiderationCount = 30
PoolConsiderationCount = 3
activationThreshold = 0
)

func updateActiveNodes(active *NodeRing, all *NodeHeap) error {
candidates := all.TopN(poolConsiderationCount)
candidates := all.TopN(PoolConsiderationCount)
for _, c := range(candidates) {
fmt.Println("Candidates", c.URL, c.PredictedThroughput)
}
added := 0
for _, c := range candidates {
if active.Contains(c) {
continue
}
activeSize := active.Len()
discount := poolConsiderationCount - activeSize
discount := PoolConsiderationCount - activeSize
if discount < 0 {
discount = 0
}
Expand Down