Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

feat: modify pool tests #153

Merged
merged 44 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
17bb795
feat: modify pool tests
AmeanAsad Aug 31, 2023
00a4344
fix: surface caboose pool methods for testing
AmeanAsad Sep 1, 2023
0cd98cf
fix: top n node selection from heap
AmeanAsad Sep 1, 2023
b6b03f9
feat: add more comprehensive tests
AmeanAsad Sep 4, 2023
5537699
enhancement: add refresh no to tests
AmeanAsad Sep 4, 2023
778cd45
go fmt
AmeanAsad Sep 4, 2023
35646a8
remove unused metrics
aarshkshah1992 Sep 4, 2023
fa59f39
put back trace
aarshkshah1992 Sep 4, 2023
a23963d
Merge pull request #154 from filecoin-saturn/feat/remove-metrics
aarshkshah1992 Sep 4, 2023
d2c669e
response size does not include header
aarshkshah1992 Sep 4, 2023
ce46ce5
reset retry counter only if progress is made
aarshkshah1992 Sep 4, 2023
ca5522d
update go-car
aarshkshah1992 Sep 4, 2023
27b62be
dont drain response body
aarshkshah1992 Sep 4, 2023
296eaec
send verification errors to Saturn
aarshkshah1992 Sep 4, 2023
eb1e8b8
pool tier promotion
aarshkshah1992 Sep 4, 2023
2713f51
otel and send trace id to Saturn
aarshkshah1992 Sep 4, 2023
7375178
stabilize dynamics tests
willscott Sep 4, 2023
93135a7
mirroring parallel
aarshkshah1992 Sep 5, 2023
bda8d0d
Merge remote-tracking branch 'origin/aa/test-simulator' into feat/por…
aarshkshah1992 Sep 5, 2023
c8be27d
pool-target-size through config to better test dynamics
willscott Sep 5, 2023
d52ef6e
down to flakiness
willscott Sep 6, 2023
61c82da
add substitution (rough)
willscott Sep 10, 2023
550cf5b
Merge remote-tracking branch 'origin/aa/test-simulator' into feat/por…
aarshkshah1992 Sep 18, 2023
c0ea85c
use new orchestrator API
aarshkshah1992 Sep 18, 2023
608a668
Merge pull request #161 from filecoin-saturn/feat/integrate-new-endpoint
aarshkshah1992 Sep 18, 2023
ea1d62b
fix: top N selection
AmeanAsad Sep 18, 2023
05c2b37
Merge branch 'aa/test-simulator' into feat/port-Caboose-main
AmeanAsad Sep 18, 2023
c1ab0e9
enhancement: increase test size
AmeanAsad Sep 19, 2023
1975f49
feat: Add tests for affinity
AmeanAsad Sep 19, 2023
78f3490
test cache affinity
aarshkshah1992 Sep 19, 2023
5e02c7f
test cache affinity
aarshkshah1992 Sep 19, 2023
d8ae01e
remove assert
aarshkshah1992 Sep 19, 2023
b647fab
fix test
aarshkshah1992 Sep 19, 2023
0cf6c94
address review
aarshkshah1992 Sep 19, 2023
552ea1b
Merge pull request #163 from filecoin-saturn/feat/cache-aff-test
aarshkshah1992 Sep 19, 2023
9eb9c18
feat: port compliance cids
AmeanAsad Sep 19, 2023
af17595
fix: remove unused code
AmeanAsad Sep 19, 2023
310c079
modify harness
AmeanAsad Sep 19, 2023
8804f45
feat: add core attr to trace span
AmeanAsad Sep 19, 2023
3f63a01
Merge pull request #164 from filecoin-saturn/feat/port-compliance-cids
aarshkshah1992 Sep 20, 2023
da9ad17
Merge branch 'aa/test-simulator' into feat/port-Caboose-main
aarshkshah1992 Sep 20, 2023
46b5374
fix CI
aarshkshah1992 Sep 20, 2023
ad399fb
Merge pull request #155 from filecoin-saturn/feat/port-Caboose-main
aarshkshah1992 Sep 20, 2023
1015a7f
improve error classification (#165)
AmeanAsad Oct 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 50 additions & 9 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ package caboose

import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

requestcontext "github.com/willscott/go-requestcontext"

ipfsblockstore "github.com/ipfs/boxo/blockstore"
ipath "github.com/ipfs/boxo/coreiface/path"
gateway "github.com/ipfs/boxo/gateway"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -30,7 +34,7 @@ type Config struct {
// OrchestratorClient is the HTTP client to use when communicating with the orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []state.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests.
LoggingEndpoint url.URL
Expand All @@ -55,6 +59,9 @@ type Config struct {
// PoolRefresh is the interval at which we refresh the pool of upstreams from the orchestrator.
PoolRefresh time.Duration

// PoolTargetSize is a baseline size for the pool - the pool will accept decrements in performance to reach maintain at least this size.
PoolTargetSize int

// MirrorFraction is what fraction of requests will be mirrored to another random node in order to track metrics / determine the current best nodes.
MirrorFraction float64

Expand All @@ -74,6 +81,9 @@ type Config struct {

// Harness is an internal test harness that is set during testing.
Harness *state.State

// ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid
ComplianceCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -88,8 +98,11 @@ const defaultMaxRetries = 3
// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

const DefaultComplianceCidPeriod = int64(100)

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
Expand Down Expand Up @@ -129,7 +142,16 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []state.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
}

logger := newLogger(config)
Expand All @@ -144,6 +166,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
Timeout: DefaultCarRequestTimeout,
}
}

c.config.Client.Transport = otelhttp.NewTransport(c.config.Client.Transport)

if c.config.OrchestratorEndpoint == nil {
var err error
c.config.OrchestratorEndpoint, err = url.Parse(DefaultOrchestratorEndpoint)
Expand All @@ -152,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.ComplianceCidPeriod == 0 {
c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand Down Expand Up @@ -185,17 +214,29 @@ func (c *Caboose) Close() {

// Fetch allows fetching car archives by a path of the form `/ipfs/<cid>[/path/to/file]`
func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error {
traceID := requestcontext.IDFromContext(ctx)
tid, err := trace.TraceIDFromHex(traceID)

ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
if err == nil {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: tid,
SpanID: span.SpanContext().SpanID(),
Remote: true,
})
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}

return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx))
}

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return false, err
}
Expand All @@ -206,7 +247,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -218,14 +259,14 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
func (c *Caboose) GetAffinity(ctx context.Context) string {
// https://github.com/ipfs/bifrost-gateway/issues/53#issuecomment-1442732865
if affG := ctx.Value(gateway.ContentPathKey); affG != nil {
contentPath := affG.(ipath.Path).String()
Expand Down
4 changes: 2 additions & 2 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"github.com/filecoin-saturn/caboose"
carv2 "github.com/ipfs/boxo/ipld/car/v2"
"github.com/ipfs/boxo/ipld/car/v2/blockstore"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
Expand Down
8 changes: 7 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ func (epr ErrPartialResponse) Error() string {
// ErrInvalidResponse can be returned from a DataCallback to indicate that the data provided for the
// requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming
// data was returned.
type ErrInvalidResponse error
type ErrInvalidResponse struct {
Message string
}

func (e ErrInvalidResponse) Error() string {
return e.Message
}
Loading
Loading