Skip to content

Commit

Permalink
disable failure auto-backoff when running dev-mode below known determ…
Browse files Browse the repository at this point in the history
…inisticly failing block
  • Loading branch information
sduchesneau committed Jun 2, 2023
1 parent 54c69b3 commit 2c0028c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
36 changes: 34 additions & 2 deletions service/failure_backoff.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package service

import "time"
import (
"regexp"
"strconv"
"time"

"github.com/streamingfast/bstream"
)

// fail fast when the exact same request has already failed twice, preventing waste of tier2 resources
var FailureBlacklistMinimalCount = 0
Expand All @@ -12,17 +18,32 @@ var FailureForcedBackoffLimit = time.Second * 30

type recordedFailure struct {
lastAt time.Time
atBlock uint64
count int
forcedBackoff time.Duration
lastError error
}

func (s *Tier1Service) errorFromRecordedFailure(id string) error {
func (s *Tier1Service) errorFromRecordedFailure(id string, isProductionMode bool, startBlock int64, startCursor string) error {
if startBlock < 0 {
return nil
}
s.failedRequestsLock.RLock()
defer s.failedRequestsLock.RUnlock()
if failure, ok := s.failedRequests[id]; ok {
if failure.count > FailureBlacklistMinimalCount {
if time.Since(failure.lastAt) < FailureBlacklistDuration {

// dev-mode requests below the failure point will still be processed on tier1
if !isProductionMode {
if uint64(startBlock) < failure.atBlock {
cur, err := bstream.CursorFromOpaque(startCursor)
if err != nil || cur.Block.Num() < failure.atBlock {
return nil
}
}
}

time.Sleep(failure.forcedBackoff)
if failure.forcedBackoff < FailureForcedBackoffLimit {
failure.forcedBackoff += FailureForcedBackoffIncrement
Expand All @@ -36,6 +57,9 @@ func (s *Tier1Service) errorFromRecordedFailure(id string) error {
return nil
}

// Error: rpc error: code = InvalidArgument desc = step new irr: handler step new: execute modules: applying executor results ... store wasm call: block 300: module "store_eth_stats": wasm execution failed ...
var blockFailureRE = regexp.MustCompile(`store wasm call: block ([0-9]*): module "([^"]*)"`)

func (s *Tier1Service) recordFailure(requestID string, err error) {
s.failedRequestsLock.Lock()
defer s.failedRequestsLock.Unlock()
Expand All @@ -44,6 +68,14 @@ func (s *Tier1Service) recordFailure(requestID string, err error) {
failure = &recordedFailure{}
s.failedRequests[requestID] = failure
}
if out := blockFailureRE.FindAllStringSubmatch(err.Error(), -1); out != nil {
if len(out[0]) == 3 {
if val, err := strconv.ParseUint(out[0][1], 10, 64); err == nil {
failure.atBlock = val
}
}
}

failure.lastAt = time.Now()
failure.lastError = err
failure.count++
Expand Down
3 changes: 2 additions & 1 deletion service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ func (s *Tier1Service) Blocks(request *pbsubstreamsrpc.Request, streamSrv pbsubs
strings.Join(request.DebugInitialStoreSnapshotForModules, ","),
)

if err := s.errorFromRecordedFailure(requestID); err != nil {
// s.resolveCursor
if err := s.errorFromRecordedFailure(requestID, request.ProductionMode, request.StartBlockNum, request.StartCursor); err != nil {
logger.Debug("failing fast on known failing request", zap.String("request_id", requestID))
return err
}
Expand Down

0 comments on commit 2c0028c

Please sign in to comment.