diff --git a/caboose.go b/caboose.go index 91fe888..869fc11 100644 --- a/caboose.go +++ b/caboose.go @@ -63,8 +63,8 @@ type Config struct { // MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing. MaxRetrievalAttempts int - // MaxFetchFailuresBeforeCoolDown is the maximum number of retrieval failures across the pool for a key we will tolerate before we - // add the key to the cool down cache. + // MaxFetchFailuresBeforeCoolDown is the maximum number of retrieval failures across the pool for a url before we auto-reject subsequent + // fetches of that url. MaxFetchFailuresBeforeCoolDown int // FetchKeyCoolDownDuration is duration of time a key will stay in the cool down cache @@ -83,8 +83,11 @@ const DefaultSaturnOrchestratorRequestTimeout = 30 * time.Second const DefaultSaturnBlockRequestTimeout = 19 * time.Second const DefaultSaturnCarRequestTimeout = 30 * time.Minute -const DefaultMaxRetries = 3 -const DefaultMirrorFraction = 0.1 +// default retries before failure unless overridden by MaxRetrievalAttempts +const defaultMaxRetries = 3 + +// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction +const defaultMirrorFraction = 0.01 const maxBlockSize = 4194305 // 4 Mib + 1 byte const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200" @@ -94,16 +97,15 @@ const DefaultPoolRefreshInterval = 5 * time.Minute // if we've seen a certain number of failures for it already in a given duration. // NOTE: before getting creative here, make sure you dont break end user flow // described in https://github.com/ipni/storetheindex/pull/1344 -const DefaultMaxFetchFailures = 3 * DefaultMaxRetries // this has to fail more than DefaultMaxRetries done for a single gateway request -const DefaultFetchKeyCoolDownDuration = 1 * time.Minute // how long will a sane person wait and stare at blank screen with "retry later" error before hitting F5? +const defaultMaxFetchFailures = 3 * defaultMaxRetries // this has to fail more than DefaultMaxRetries done for a single gateway request +const defaultFetchKeyCoolDownDuration = 1 * time.Minute // how long will a sane person wait and stare at blank screen with "retry later" error before hitting F5? // we cool off sending requests to a Saturn node if it returns transient errors rather than immediately downvoting it; // however, only upto a certain max number of cool-offs. -const DefaultSaturnNodeCoolOff = 5 * time.Minute +const defaultSaturnNodeCoolOff = 5 * time.Minute var ErrNotImplemented error = errors.New("not implemented") var ErrNoBackend error = errors.New("no available saturn backend") -var ErrBackendFailed error = errors.New("saturn backend failed") var ErrContentProviderNotFound error = errors.New("saturn failed to find content providers") var ErrSaturnTimeout error = errors.New("saturn backend timed out") @@ -181,17 +183,17 @@ type DataCallback func(resource string, reader io.Reader) error // Every request will result in a remote network request. func NewCaboose(config *Config) (*Caboose, error) { if config.FetchKeyCoolDownDuration == 0 { - config.FetchKeyCoolDownDuration = DefaultFetchKeyCoolDownDuration + config.FetchKeyCoolDownDuration = defaultFetchKeyCoolDownDuration } if config.MaxFetchFailuresBeforeCoolDown == 0 { - config.MaxFetchFailuresBeforeCoolDown = DefaultMaxFetchFailures + config.MaxFetchFailuresBeforeCoolDown = defaultMaxFetchFailures } if config.SaturnNodeCoolOff == 0 { - config.SaturnNodeCoolOff = DefaultSaturnNodeCoolOff + config.SaturnNodeCoolOff = defaultSaturnNodeCoolOff } if config.MirrorFraction == 0 { - config.MirrorFraction = DefaultMirrorFraction + config.MirrorFraction = defaultMirrorFraction } if override := os.Getenv(BackendOverrideKey); len(override) > 0 { config.OrchestratorOverride = strings.Split(override, ",") @@ -222,7 +224,7 @@ func NewCaboose(config *Config) (*Caboose, error) { } if c.config.MaxRetrievalAttempts == 0 { - c.config.MaxRetrievalAttempts = DefaultMaxRetries + c.config.MaxRetrievalAttempts = defaultMaxRetries } // start the pool diff --git a/fetcher.go b/fetcher.go index 28ce84d..183765a 100644 --- a/fetcher.go +++ b/fetcher.go @@ -237,7 +237,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, reqCtx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil) + req, err := http.NewRequestWithContext(httpstat.WithHTTPStat(reqCtx, &result), http.MethodGet, reqUrl, nil) if err != nil { if recordIfContextErr(resourceType, reqCtx, "build-http-request") { return rm, reqCtx.Err() @@ -258,9 +258,6 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, agent := req.Header.Get("User-Agent") req.Header.Set("User-Agent", os.Getenv(SaturnEnvKey)+"/"+agent) - //trace - req = req.WithContext(httpstat.WithHTTPStat(req.Context(), &result)) - var resp *http.Response saturnCallsTotalMetric.WithLabelValues(resourceType).Add(1) startReq := time.Now() diff --git a/pool.go b/pool.go index 154566b..b20e156 100644 --- a/pool.go +++ b/pool.go @@ -36,6 +36,7 @@ func (p *pool) loadPool() ([]string, error) { if p.config.OrchestratorOverride != nil { return p.config.OrchestratorOverride, nil } + resp, err := p.config.OrchestratorClient.Get(p.config.OrchestratorEndpoint.String()) if err != nil { goLogger.Warnw("failed to get backends from orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint.String()) @@ -52,7 +53,7 @@ func (p *pool) loadPool() ([]string, error) { return responses, nil } -type poolRequest struct { +type mirroredPoolRequest struct { node string path string // the key for node affinity for the request @@ -66,7 +67,7 @@ type pool struct { started chan struct{} // started signals that we've already initialized the pool once with Saturn endpoints. refresh chan struct{} // refresh is used to signal the need for doing a refresh of the Saturn endpoints pool. done chan struct{} // done is used to signal that we're shutting down the Saturn endpoints pool and don't need to refresh it anymore. - mirrorSamples chan poolRequest + mirrorSamples chan mirroredPoolRequest fetchKeyLk sync.RWMutex fetchKeyFailureCache *cache.Cache // guarded by fetchKeyLk @@ -91,7 +92,7 @@ func newPool(c *Config) *pool { started: make(chan struct{}), refresh: make(chan struct{}, 1), done: make(chan struct{}, 1), - mirrorSamples: make(chan poolRequest, 10), + mirrorSamples: make(chan mirroredPoolRequest, 10), fetchKeyCoolDownCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute), fetchKeyFailureCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute), @@ -331,7 +332,7 @@ func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk // mirror successful request if p.config.MirrorFraction > rand.Float64() { select { - case p.mirrorSamples <- poolRequest{node: nodes[i], path: fmt.Sprintf("/ipfs/%s?format=car&car-scope=block", c), key: aff}: + case p.mirrorSamples <- mirroredPoolRequest{node: nodes[i], path: fmt.Sprintf("/ipfs/%s?format=car&car-scope=block", c), key: aff}: default: } } @@ -428,7 +429,7 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba // sample request for mirroring if p.config.MirrorFraction > rand.Float64() { select { - case p.mirrorSamples <- poolRequest{node: nodes[i], path: pq[0], key: aff}: + case p.mirrorSamples <- mirroredPoolRequest{node: nodes[i], path: pq[0], key: aff}: default: } }