Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #164 from filecoin-saturn/feat/port-compliance-cids
Browse files Browse the repository at this point in the history
feat: port compliance cids
  • Loading branch information
aarshkshah1992 authored Sep 20, 2023
2 parents 05c2b37 + 8804f45 commit 3f63a01
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 66 deletions.
23 changes: 19 additions & 4 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package caboose

import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

requestcontext "github.com/willscott/go-requestcontext"
Expand Down Expand Up @@ -34,7 +34,7 @@ type Config struct {
// OrchestratorClient is the HTTP client to use when communicating with the orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []state.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests.
LoggingEndpoint url.URL
Expand Down Expand Up @@ -81,6 +81,9 @@ type Config struct {

// Harness is an internal test harness that is set during testing.
Harness *state.State

// ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid
ComplianceCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -95,10 +98,12 @@ const defaultMaxRetries = 3
// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

const DefaultComplianceCidPeriod = int64(100)

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
// NOTE: before getting creative here, make sure you dont break end user flow
Expand Down Expand Up @@ -137,7 +142,13 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []state.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
Expand Down Expand Up @@ -166,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.ComplianceCidPeriod == 0 {
c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand Down
6 changes: 5 additions & 1 deletion fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
return ce
}

ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime)))
p.ActiveNodes.lk.RLock()
isCore := p.ActiveNodes.IsCore(from)
p.ActiveNodes.lk.RUnlock()

ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime), attribute.Bool("core", isCore)))
defer span.End()

requestId := uuid.NewString()
Expand Down
16 changes: 15 additions & 1 deletion internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch.Endpoints[i].Setup()
ip := strings.TrimPrefix(ch.Endpoints[i].Server.URL, "https://")

cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(ip))
cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(testBlock))

purls[i] = state.NodeInfo{
IP: ip,
Expand Down Expand Up @@ -77,6 +77,8 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
PoolRefresh: time.Second * 50,
MaxRetrievalAttempts: maxRetries,
Harness: &state.State{},

MirrorFraction: 1.0,
}

for _, opt := range opts {
Expand Down Expand Up @@ -257,6 +259,18 @@ func WithMaxFailuresBeforeCoolDown(max int) func(config *caboose.Config) {
}
}

func WithComplianceCidPeriod(n int64) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.ComplianceCidPeriod = n
}
}

func WithMirrorFraction(n float64) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.MirrorFraction = n
}
}

func WithCidCoolDownDuration(duration time.Duration) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.FetchKeyCoolDownDuration = duration
Expand Down
5 changes: 5 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ var (
mirroredTrafficTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "mirrored_traffic_total"),
}, []string{"error_status"})

complianceCidCallsTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "compliance_cids_total"),
}, []string{"error_status"})
)

var CabooseMetrics = prometheus.NewRegistry()
Expand Down Expand Up @@ -163,6 +167,7 @@ func init() {
CabooseMetrics.MustRegister(saturnCallsTotalMetric)
CabooseMetrics.MustRegister(saturnCallsFailureTotalMetric)
CabooseMetrics.MustRegister(saturnConnectionFailureTotalMetric)
CabooseMetrics.MustRegister(complianceCidCallsTotalMetric)

CabooseMetrics.MustRegister(saturnCallsSuccessTotalMetric)

Expand Down
13 changes: 9 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/filecoin-saturn/caboose/internal/state"
"github.com/zyedidia/generic/queue"
)

Expand All @@ -14,7 +15,9 @@ const (
)

type Node struct {
URL string
URL string
ComplianceCid string
Core bool

PredictedLatency float64
PredictedThroughput float64
Expand All @@ -25,10 +28,12 @@ type Node struct {
lk sync.RWMutex
}

func NewNode(url string) *Node {
func NewNode(info state.NodeInfo) *Node {
return &Node{
URL: url,
Samples: queue.New[NodeSample](),
URL: info.IP,
ComplianceCid: info.ComplianceCid,
Core: info.Core,
Samples: queue.New[NodeSample](),
}
}

Expand Down
10 changes: 8 additions & 2 deletions node_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package caboose

import (
"container/heap"
"math/rand"
"sync"
)

Expand Down Expand Up @@ -45,8 +46,13 @@ func (nh *NodeHeap) Best() *Node {
func (nh *NodeHeap) PeekRandom() *Node {
nh.lk.RLock()
defer nh.lk.RUnlock()
// TODO
return nil

if len(nh.Nodes) == 0 {
return nil
}

randIdx := rand.Intn(len(nh.Nodes))
return nh.Nodes[randIdx]
}

func (nh *NodeHeap) TopN(n int) []*Node {
Expand Down
11 changes: 11 additions & 0 deletions node_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ func (nr *NodeRing) Contains(n *Node) bool {
return ok
}

func (nr *NodeRing) IsCore(n *Node) bool {
nr.lk.RLock()
defer nr.lk.RUnlock()

nd, ok := nr.Nodes[n.URL]
if !ok {
return false
}
return nd.Core
}

func (nr *NodeRing) GetNodes(key string, number int) ([]*Node, error) {
nr.lk.RLock()
defer nr.lk.RUnlock()
Expand Down
47 changes: 37 additions & 10 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package caboose

import (
"context"
cryptoRand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"github.com/filecoin-saturn/caboose/internal/state"
"io"
"math/big"
"math/rand"
"net/url"
"sync"
"time"

"github.com/filecoin-saturn/caboose/internal/state"

"github.com/patrickmn/go-cache"

"github.com/ipfs/boxo/path"
Expand All @@ -25,9 +28,11 @@ const (
defaultMirroredConcurrency = 5
)

var complianceCidReqTemplate = "/ipfs/%s?format=raw"

// loadPool refreshes the set of endpoints in the pool by fetching an updated list of nodes from the
// Orchestrator.
func (p *pool) loadPool() ([]string, error) {
func (p *pool) loadPool() ([]state.NodeInfo, error) {
if p.config.OrchestratorOverride != nil {
return p.config.OrchestratorOverride, nil
}
Expand All @@ -48,13 +53,7 @@ func (p *pool) loadPool() ([]string, error) {

goLogger.Infow("got backends from orchestrators", "cnt", len(responses), "endpoint", p.config.OrchestratorEndpoint.String())

var ips []string

for _, r := range responses {
ips = append(ips, r.IP)
}

return ips, nil
return responses, nil
}

type mirroredPoolRequest struct {
Expand Down Expand Up @@ -149,14 +148,27 @@ func (p *pool) refreshPool() {
}
}

func (p *pool) fetchComplianceCid(node *Node) error {
sc := node.ComplianceCid
if len(node.ComplianceCid) == 0 {
goLogger.Warnw("failed to find compliance cid ", "for node", node)
return fmt.Errorf("compliance cid doesn't exist for node: %s ", node)
}
trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
reqUrl := fmt.Sprintf(complianceCidReqTemplate, sc)
goLogger.Debugw("fetching compliance cid", "cid", reqUrl, "from", node)
err := p.fetchResourceAndUpdate(trialTimeout, node, reqUrl, 0, p.mirrorValidator)
cancel()
return err
}

func (p *pool) checkPool() {
sem := make(chan struct{}, defaultMirroredConcurrency)

for {
select {
case msg := <-p.mirrorSamples:
sem <- struct{}{}

go func(msg mirroredPoolRequest) {
defer func() { <-sem }()

Expand All @@ -169,11 +181,26 @@ func (p *pool) checkPool() {
return
}
if p.ActiveNodes.Contains(testNode) {
rand := big.NewInt(1)
if p.config.ComplianceCidPeriod > 0 {
rand, _ = cryptoRand.Int(cryptoRand.Reader, big.NewInt(p.config.ComplianceCidPeriod))
}

if rand.Cmp(big.NewInt(0)) == 0 {
err := p.fetchComplianceCid(testNode)
if err != nil {
goLogger.Warnw("failed to fetch compliance cid ", "err", err)
complianceCidCallsTotalMetric.WithLabelValues("error").Add(1)
} else {
complianceCidCallsTotalMetric.WithLabelValues("success").Add(1)
}
}
return
}

trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator)

cancel()
if err != nil {
mirroredTrafficTotalMetric.WithLabelValues("error").Inc()
Expand Down
23 changes: 22 additions & 1 deletion pool_refresh_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package caboose

import (
"math/rand"
"testing"

"github.com/filecoin-saturn/caboose/internal/state"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

Expand All @@ -29,8 +33,25 @@ func TestPoolRefresh(t *testing.T) {
}

func addAndAssertPool(t *testing.T, p *pool, nodes []string, expectedTotal int) {
for _, n := range nodes {
nodeStructs := genNodeStructs(nodes)
for _, n := range nodeStructs {
p.AllNodes.AddIfNotPresent(NewNode(n))
}
require.Equal(t, expectedTotal, p.AllNodes.Len())
}

func genNodeStructs(nodes []string) []state.NodeInfo {
var nodeStructs []state.NodeInfo

for _, node := range nodes {
cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(node))
nodeStructs = append(nodeStructs, state.NodeInfo{
IP: node,
ID: node,
Weight: rand.Intn(100),
Distance: rand.Float32(),
ComplianceCid: cid.String(),
})
}
return nodeStructs
}
Loading

0 comments on commit 3f63a01

Please sign in to comment.